~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

  • Committer: mbp at sourcefrog
  • Date: 2005-03-24 00:44:18 UTC
  • Revision ID: mbp@sourcefrog.net-20050324004418-b4a050f656c07f5f
show space usage for various stores in the info command

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Implementation of Transport over SFTP, using paramiko."""
18
 
 
19
 
# TODO: Remove the transport-based lock_read and lock_write methods.  They'll
20
 
# then raise TransportNotPossible, which will break remote access to any
21
 
# formats which rely on OS-level locks.  That should be fine as those formats
22
 
# are pretty old, but these combinations may have to be removed from the test
23
 
# suite.  Those formats all date back to 0.7; so we should be able to remove
24
 
# these methods when we officially drop support for those formats.
25
 
 
26
 
import bisect
27
 
import errno
28
 
import itertools
29
 
import os
30
 
import random
31
 
import select
32
 
import socket
33
 
import stat
34
 
import sys
35
 
import time
36
 
import urllib
37
 
import urlparse
38
 
import warnings
39
 
 
40
 
from bzrlib import (
41
 
    config,
42
 
    debug,
43
 
    errors,
44
 
    urlutils,
45
 
    )
46
 
from bzrlib.errors import (FileExists,
47
 
                           NoSuchFile, PathNotChild,
48
 
                           TransportError,
49
 
                           LockError,
50
 
                           PathError,
51
 
                           ParamikoNotPresent,
52
 
                           )
53
 
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
54
 
from bzrlib.symbol_versioning import (
55
 
        deprecated_function,
56
 
        )
57
 
from bzrlib.trace import mutter, warning
58
 
from bzrlib.transport import (
59
 
    FileFileStream,
60
 
    _file_streams,
61
 
    local,
62
 
    Server,
63
 
    ssh,
64
 
    ConnectedTransport,
65
 
    )
66
 
 
67
 
# Disable one particular warning that comes from paramiko in Python2.5; if
68
 
# this is emitted at the wrong time it tends to cause spurious test failures
69
 
# or at least noise in the test case::
70
 
#
71
 
# [1770/7639 in 86s, 1 known failures, 50 skipped, 2 missing features]
72
 
# test_permissions.TestSftpPermissions.test_new_files
73
 
# /var/lib/python-support/python2.5/paramiko/message.py:226: DeprecationWarning: integer argument expected, got float
74
 
#  self.packet.write(struct.pack('>I', n))
75
 
warnings.filterwarnings('ignore',
76
 
        'integer argument expected, got float',
77
 
        category=DeprecationWarning,
78
 
        module='paramiko.message')
79
 
 
80
 
try:
81
 
    import paramiko
82
 
except ImportError, e:
83
 
    raise ParamikoNotPresent(e)
84
 
else:
85
 
    from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
86
 
                               SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
87
 
                               CMD_HANDLE, CMD_OPEN)
88
 
    from paramiko.sftp_attr import SFTPAttributes
89
 
    from paramiko.sftp_file import SFTPFile
90
 
 
91
 
 
92
 
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
93
 
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
94
 
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
95
 
 
96
 
 
97
 
class SFTPLock(object):
98
 
    """This fakes a lock in a remote location.
99
 
 
100
 
    A present lock is indicated just by the existence of a file.  This
101
 
    doesn't work well on all transports and they are only used in
102
 
    deprecated storage formats.
103
 
    """
104
 
 
105
 
    __slots__ = ['path', 'lock_path', 'lock_file', 'transport']
106
 
 
107
 
    def __init__(self, path, transport):
108
 
        self.lock_file = None
109
 
        self.path = path
110
 
        self.lock_path = path + '.write-lock'
111
 
        self.transport = transport
112
 
        try:
113
 
            # RBC 20060103 FIXME should we be using private methods here ?
114
 
            abspath = transport._remote_path(self.lock_path)
115
 
            self.lock_file = transport._sftp_open_exclusive(abspath)
116
 
        except FileExists:
117
 
            raise LockError('File %r already locked' % (self.path,))
118
 
 
119
 
    def __del__(self):
120
 
        """Should this warn, or actually try to cleanup?"""
121
 
        if self.lock_file:
122
 
            warning("SFTPLock %r not explicitly unlocked" % (self.path,))
123
 
            self.unlock()
124
 
 
125
 
    def unlock(self):
126
 
        if not self.lock_file:
127
 
            return
128
 
        self.lock_file.close()
129
 
        self.lock_file = None
130
 
        try:
131
 
            self.transport.delete(self.lock_path)
132
 
        except (NoSuchFile,):
133
 
            # What specific errors should we catch here?
134
 
            pass
135
 
 
136
 
 
137
 
class _SFTPReadvHelper(object):
138
 
    """A class to help with managing the state of a readv request."""
139
 
 
140
 
    # See _get_requests for an explanation.
141
 
    _max_request_size = 32768
142
 
 
143
 
    def __init__(self, original_offsets, relpath, _report_activity):
144
 
        """Create a new readv helper.
145
 
 
146
 
        :param original_offsets: The original requests given by the caller of
147
 
            readv()
148
 
        :param relpath: The name of the file (if known)
149
 
        :param _report_activity: A Transport._report_activity bound method,
150
 
            to be called as data arrives.
151
 
        """
152
 
        self.original_offsets = list(original_offsets)
153
 
        self.relpath = relpath
154
 
        self._report_activity = _report_activity
155
 
 
156
 
    def _get_requests(self):
157
 
        """Break up the offsets into individual requests over sftp.
158
 
 
159
 
        The SFTP spec only requires implementers to support 32kB requests. We
160
 
        could try something larger (openssh supports 64kB), but then we have to
161
 
        handle requests that fail.
162
 
        So instead, we just break up our maximum chunks into 32kB chunks, and
163
 
        asyncronously requests them.
164
 
        Newer versions of paramiko would do the chunking for us, but we want to
165
 
        start processing results right away, so we do it ourselves.
166
 
        """
167
 
        # TODO: Because we issue async requests, we don't 'fudge' any extra
168
 
        #       data.  I'm not 100% sure that is the best choice.
169
 
 
170
 
        # The first thing we do, is to collapse the individual requests as much
171
 
        # as possible, so we don't issues requests <32kB
172
 
        sorted_offsets = sorted(self.original_offsets)
173
 
        coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
174
 
                                                        limit=0, fudge_factor=0))
175
 
        requests = []
176
 
        for c_offset in coalesced:
177
 
            start = c_offset.start
178
 
            size = c_offset.length
179
 
 
180
 
            # Break this up into 32kB requests
181
 
            while size > 0:
182
 
                next_size = min(size, self._max_request_size)
183
 
                requests.append((start, next_size))
184
 
                size -= next_size
185
 
                start += next_size
186
 
        if 'sftp' in debug.debug_flags:
187
 
            mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
188
 
                self.relpath, len(sorted_offsets), len(coalesced),
189
 
                len(requests))
190
 
        return requests
191
 
 
192
 
    def request_and_yield_offsets(self, fp):
193
 
        """Request the data from the remote machine, yielding the results.
194
 
 
195
 
        :param fp: A Paramiko SFTPFile object that supports readv.
196
 
        :return: Yield the data requested by the original readv caller, one by
197
 
            one.
198
 
        """
199
 
        requests = self._get_requests()
200
 
        offset_iter = iter(self.original_offsets)
201
 
        cur_offset, cur_size = offset_iter.next()
202
 
        # paramiko .readv() yields strings that are in the order of the requests
203
 
        # So we track the current request to know where the next data is
204
 
        # being returned from.
205
 
        input_start = None
206
 
        last_end = None
207
 
        buffered_data = []
208
 
        buffered_len = 0
209
 
 
210
 
        # This is used to buffer chunks which we couldn't process yet
211
 
        # It is (start, end, data) tuples.
212
 
        data_chunks = []
213
 
        # Create an 'unlimited' data stream, so we stop based on requests,
214
 
        # rather than just because the data stream ended. This lets us detect
215
 
        # short readv.
216
 
        data_stream = itertools.chain(fp.readv(requests),
217
 
                                      itertools.repeat(None))
218
 
        for (start, length), data in itertools.izip(requests, data_stream):
219
 
            if data is None:
220
 
                if cur_coalesced is not None:
221
 
                    raise errors.ShortReadvError(self.relpath,
222
 
                        start, length, len(data))
223
 
            if len(data) != length:
224
 
                raise errors.ShortReadvError(self.relpath,
225
 
                    start, length, len(data))
226
 
            self._report_activity(length, 'read')
227
 
            if last_end is None:
228
 
                # This is the first request, just buffer it
229
 
                buffered_data = [data]
230
 
                buffered_len = length
231
 
                input_start = start
232
 
            elif start == last_end:
233
 
                # The data we are reading fits neatly on the previous
234
 
                # buffer, so this is all part of a larger coalesced range.
235
 
                buffered_data.append(data)
236
 
                buffered_len += length
237
 
            else:
238
 
                # We have an 'interrupt' in the data stream. So we know we are
239
 
                # at a request boundary.
240
 
                if buffered_len > 0:
241
 
                    # We haven't consumed the buffer so far, so put it into
242
 
                    # data_chunks, and continue.
243
 
                    buffered = ''.join(buffered_data)
244
 
                    data_chunks.append((input_start, buffered))
245
 
                input_start = start
246
 
                buffered_data = [data]
247
 
                buffered_len = length
248
 
            last_end = start + length
249
 
            if input_start == cur_offset and cur_size <= buffered_len:
250
 
                # Simplify the next steps a bit by transforming buffered_data
251
 
                # into a single string. We also have the nice property that
252
 
                # when there is only one string ''.join([x]) == x, so there is
253
 
                # no data copying.
254
 
                buffered = ''.join(buffered_data)
255
 
                # Clean out buffered data so that we keep memory
256
 
                # consumption low
257
 
                del buffered_data[:]
258
 
                buffered_offset = 0
259
 
                # TODO: We *could* also consider the case where cur_offset is in
260
 
                #       in the buffered range, even though it doesn't *start*
261
 
                #       the buffered range. But for packs we pretty much always
262
 
                #       read in order, so you won't get any extra data in the
263
 
                #       middle.
264
 
                while (input_start == cur_offset
265
 
                       and (buffered_offset + cur_size) <= buffered_len):
266
 
                    # We've buffered enough data to process this request, spit it
267
 
                    # out
268
 
                    cur_data = buffered[buffered_offset:buffered_offset + cur_size]
269
 
                    # move the direct pointer into our buffered data
270
 
                    buffered_offset += cur_size
271
 
                    # Move the start-of-buffer pointer
272
 
                    input_start += cur_size
273
 
                    # Yield the requested data
274
 
                    yield cur_offset, cur_data
275
 
                    cur_offset, cur_size = offset_iter.next()
276
 
                # at this point, we've consumed as much of buffered as we can,
277
 
                # so break off the portion that we consumed
278
 
                if buffered_offset == len(buffered_data):
279
 
                    # No tail to leave behind
280
 
                    buffered_data = []
281
 
                    buffered_len = 0
282
 
                else:
283
 
                    buffered = buffered[buffered_offset:]
284
 
                    buffered_data = [buffered]
285
 
                    buffered_len = len(buffered)
286
 
        if buffered_len:
287
 
            buffered = ''.join(buffered_data)
288
 
            del buffered_data[:]
289
 
            data_chunks.append((input_start, buffered))
290
 
        if data_chunks:
291
 
            if 'sftp' in debug.debug_flags:
292
 
                mutter('SFTP readv left with %d out-of-order bytes',
293
 
                    sum(map(lambda x: len(x[1]), data_chunks)))
294
 
            # We've processed all the readv data, at this point, anything we
295
 
            # couldn't process is in data_chunks. This doesn't happen often, so
296
 
            # this code path isn't optimized
297
 
            # We use an interesting process for data_chunks
298
 
            # Specifically if we have "bisect_left([(start, len, entries)],
299
 
            #                                       (qstart,)])
300
 
            # If start == qstart, then we get the specific node. Otherwise we
301
 
            # get the previous node
302
 
            while True:
303
 
                idx = bisect.bisect_left(data_chunks, (cur_offset,))
304
 
                if idx < len(data_chunks) and data_chunks[idx][0] == cur_offset:
305
 
                    # The data starts here
306
 
                    data = data_chunks[idx][1][:cur_size]
307
 
                elif idx > 0:
308
 
                    # The data is in a portion of a previous page
309
 
                    idx -= 1
310
 
                    sub_offset = cur_offset - data_chunks[idx][0]
311
 
                    data = data_chunks[idx][1]
312
 
                    data = data[sub_offset:sub_offset + cur_size]
313
 
                else:
314
 
                    # We are missing the page where the data should be found,
315
 
                    # something is wrong
316
 
                    data = ''
317
 
                if len(data) != cur_size:
318
 
                    raise AssertionError('We must have miscalulated.'
319
 
                        ' We expected %d bytes, but only found %d'
320
 
                        % (cur_size, len(data)))
321
 
                yield cur_offset, data
322
 
                cur_offset, cur_size = offset_iter.next()
323
 
 
324
 
 
325
 
class SFTPTransport(ConnectedTransport):
326
 
    """Transport implementation for SFTP access."""
327
 
 
328
 
    _do_prefetch = _default_do_prefetch
329
 
    # TODO: jam 20060717 Conceivably these could be configurable, either
330
 
    #       by auto-tuning at run-time, or by a configuration (per host??)
331
 
    #       but the performance curve is pretty flat, so just going with
332
 
    #       reasonable defaults.
333
 
    _max_readv_combine = 200
334
 
    # Having to round trip to the server means waiting for a response,
335
 
    # so it is better to download extra bytes.
336
 
    # 8KiB had good performance for both local and remote network operations
337
 
    _bytes_to_read_before_seek = 8192
338
 
 
339
 
    # The sftp spec says that implementations SHOULD allow reads
340
 
    # to be at least 32K. paramiko.readv() does an async request
341
 
    # for the chunks. So we need to keep it within a single request
342
 
    # size for paramiko <= 1.6.1. paramiko 1.6.2 will probably chop
343
 
    # up the request itself, rather than us having to worry about it
344
 
    _max_request_size = 32768
345
 
 
346
 
    def __init__(self, base, _from_transport=None):
347
 
        super(SFTPTransport, self).__init__(base,
348
 
                                            _from_transport=_from_transport)
349
 
 
350
 
    def _remote_path(self, relpath):
351
 
        """Return the path to be passed along the sftp protocol for relpath.
352
 
 
353
 
        :param relpath: is a urlencoded string.
354
 
        """
355
 
        relative = urlutils.unescape(relpath).encode('utf-8')
356
 
        remote_path = self._combine_paths(self._path, relative)
357
 
        # the initial slash should be removed from the path, and treated as a
358
 
        # homedir relative path (the path begins with a double slash if it is
359
 
        # absolute).  see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
360
 
        # RBC 20060118 we are not using this as its too user hostile. instead
361
 
        # we are following lftp and using /~/foo to mean '~/foo'
362
 
        # vila--20070602 and leave absolute paths begin with a single slash.
363
 
        if remote_path.startswith('/~/'):
364
 
            remote_path = remote_path[3:]
365
 
        elif remote_path == '/~':
366
 
            remote_path = ''
367
 
        return remote_path
368
 
 
369
 
    def _create_connection(self, credentials=None):
370
 
        """Create a new connection with the provided credentials.
371
 
 
372
 
        :param credentials: The credentials needed to establish the connection.
373
 
 
374
 
        :return: The created connection and its associated credentials.
375
 
 
376
 
        The credentials are only the password as it may have been entered
377
 
        interactively by the user and may be different from the one provided
378
 
        in base url at transport creation time.
379
 
        """
380
 
        if credentials is None:
381
 
            password = self._password
382
 
        else:
383
 
            password = credentials
384
 
 
385
 
        vendor = ssh._get_ssh_vendor()
386
 
        user = self._user
387
 
        if user is None:
388
 
            auth = config.AuthenticationConfig()
389
 
            user = auth.get_user('ssh', self._host, self._port)
390
 
        connection = vendor.connect_sftp(self._user, password,
391
 
                                         self._host, self._port)
392
 
        return connection, (user, password)
393
 
 
394
 
    def _get_sftp(self):
395
 
        """Ensures that a connection is established"""
396
 
        connection = self._get_connection()
397
 
        if connection is None:
398
 
            # First connection ever
399
 
            connection, credentials = self._create_connection()
400
 
            self._set_connection(connection, credentials)
401
 
        return connection
402
 
 
403
 
    def has(self, relpath):
404
 
        """
405
 
        Does the target location exist?
406
 
        """
407
 
        try:
408
 
            self._get_sftp().stat(self._remote_path(relpath))
409
 
            # stat result is about 20 bytes, let's say
410
 
            self._report_activity(20, 'read')
411
 
            return True
412
 
        except IOError:
413
 
            return False
414
 
 
415
 
    def get(self, relpath):
416
 
        """Get the file at the given relative path.
417
 
 
418
 
        :param relpath: The relative path to the file
419
 
        """
420
 
        try:
421
 
            # FIXME: by returning the file directly, we don't pass this
422
 
            # through to report_activity.  We could try wrapping the object
423
 
            # before it's returned.  For readv and get_bytes it's handled in
424
 
            # the higher-level function.
425
 
            # -- mbp 20090126
426
 
            path = self._remote_path(relpath)
427
 
            f = self._get_sftp().file(path, mode='rb')
428
 
            if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
429
 
                f.prefetch()
430
 
            return f
431
 
        except (IOError, paramiko.SSHException), e:
432
 
            self._translate_io_exception(e, path, ': error retrieving',
433
 
                failure_exc=errors.ReadError)
434
 
 
435
 
    def get_bytes(self, relpath):
436
 
        # reimplement this here so that we can report how many bytes came back
437
 
        f = self.get(relpath)
438
 
        try:
439
 
            bytes = f.read()
440
 
            self._report_activity(len(bytes), 'read')
441
 
            return bytes
442
 
        finally:
443
 
            f.close()
444
 
 
445
 
    def _readv(self, relpath, offsets):
446
 
        """See Transport.readv()"""
447
 
        # We overload the default readv() because we want to use a file
448
 
        # that does not have prefetch enabled.
449
 
        # Also, if we have a new paramiko, it implements an async readv()
450
 
        if not offsets:
451
 
            return
452
 
 
453
 
        try:
454
 
            path = self._remote_path(relpath)
455
 
            fp = self._get_sftp().file(path, mode='rb')
456
 
            readv = getattr(fp, 'readv', None)
457
 
            if readv:
458
 
                return self._sftp_readv(fp, offsets, relpath)
459
 
            if 'sftp' in debug.debug_flags:
460
 
                mutter('seek and read %s offsets', len(offsets))
461
 
            return self._seek_and_read(fp, offsets, relpath)
462
 
        except (IOError, paramiko.SSHException), e:
463
 
            self._translate_io_exception(e, path, ': error retrieving')
464
 
 
465
 
    def recommended_page_size(self):
466
 
        """See Transport.recommended_page_size().
467
 
 
468
 
        For SFTP we suggest a large page size to reduce the overhead
469
 
        introduced by latency.
470
 
        """
471
 
        return 64 * 1024
472
 
 
473
 
    def _sftp_readv(self, fp, offsets, relpath):
474
 
        """Use the readv() member of fp to do async readv.
475
 
 
476
 
        Then read them using paramiko.readv(). paramiko.readv()
477
 
        does not support ranges > 64K, so it caps the request size, and
478
 
        just reads until it gets all the stuff it wants.
479
 
        """
480
 
        helper = _SFTPReadvHelper(offsets, relpath, self._report_activity)
481
 
        return helper.request_and_yield_offsets(fp)
482
 
 
483
 
    def put_file(self, relpath, f, mode=None):
484
 
        """
485
 
        Copy the file-like object into the location.
486
 
 
487
 
        :param relpath: Location to put the contents, relative to base.
488
 
        :param f:       File-like object.
489
 
        :param mode: The final mode for the file
490
 
        """
491
 
        final_path = self._remote_path(relpath)
492
 
        return self._put(final_path, f, mode=mode)
493
 
 
494
 
    def _put(self, abspath, f, mode=None):
495
 
        """Helper function so both put() and copy_abspaths can reuse the code"""
496
 
        tmp_abspath = '%s.tmp.%.9f.%d.%d' % (abspath, time.time(),
497
 
                        os.getpid(), random.randint(0,0x7FFFFFFF))
498
 
        fout = self._sftp_open_exclusive(tmp_abspath, mode=mode)
499
 
        closed = False
500
 
        try:
501
 
            try:
502
 
                fout.set_pipelined(True)
503
 
                length = self._pump(f, fout)
504
 
            except (IOError, paramiko.SSHException), e:
505
 
                self._translate_io_exception(e, tmp_abspath)
506
 
            # XXX: This doesn't truly help like we would like it to.
507
 
            #      The problem is that openssh strips sticky bits. So while we
508
 
            #      can properly set group write permission, we lose the group
509
 
            #      sticky bit. So it is probably best to stop chmodding, and
510
 
            #      just tell users that they need to set the umask correctly.
511
 
            #      The attr.st_mode = mode, in _sftp_open_exclusive
512
 
            #      will handle when the user wants the final mode to be more
513
 
            #      restrictive. And then we avoid a round trip. Unless
514
 
            #      paramiko decides to expose an async chmod()
515
 
 
516
 
            # This is designed to chmod() right before we close.
517
 
            # Because we set_pipelined() earlier, theoretically we might
518
 
            # avoid the round trip for fout.close()
519
 
            if mode is not None:
520
 
                self._get_sftp().chmod(tmp_abspath, mode)
521
 
            fout.close()
522
 
            closed = True
523
 
            self._rename_and_overwrite(tmp_abspath, abspath)
524
 
            return length
525
 
        except Exception, e:
526
 
            # If we fail, try to clean up the temporary file
527
 
            # before we throw the exception
528
 
            # but don't let another exception mess things up
529
 
            # Write out the traceback, because otherwise
530
 
            # the catch and throw destroys it
531
 
            import traceback
532
 
            mutter(traceback.format_exc())
533
 
            try:
534
 
                if not closed:
535
 
                    fout.close()
536
 
                self._get_sftp().remove(tmp_abspath)
537
 
            except:
538
 
                # raise the saved except
539
 
                raise e
540
 
            # raise the original with its traceback if we can.
541
 
            raise
542
 
 
543
 
    def _put_non_atomic_helper(self, relpath, writer, mode=None,
544
 
                               create_parent_dir=False,
545
 
                               dir_mode=None):
546
 
        abspath = self._remote_path(relpath)
547
 
 
548
 
        # TODO: jam 20060816 paramiko doesn't publicly expose a way to
549
 
        #       set the file mode at create time. If it does, use it.
550
 
        #       But for now, we just chmod later anyway.
551
 
 
552
 
        def _open_and_write_file():
553
 
            """Try to open the target file, raise error on failure"""
554
 
            fout = None
555
 
            try:
556
 
                try:
557
 
                    fout = self._get_sftp().file(abspath, mode='wb')
558
 
                    fout.set_pipelined(True)
559
 
                    writer(fout)
560
 
                except (paramiko.SSHException, IOError), e:
561
 
                    self._translate_io_exception(e, abspath,
562
 
                                                 ': unable to open')
563
 
 
564
 
                # This is designed to chmod() right before we close.
565
 
                # Because we set_pipelined() earlier, theoretically we might
566
 
                # avoid the round trip for fout.close()
567
 
                if mode is not None:
568
 
                    self._get_sftp().chmod(abspath, mode)
569
 
            finally:
570
 
                if fout is not None:
571
 
                    fout.close()
572
 
 
573
 
        if not create_parent_dir:
574
 
            _open_and_write_file()
575
 
            return
576
 
 
577
 
        # Try error handling to create the parent directory if we need to
578
 
        try:
579
 
            _open_and_write_file()
580
 
        except NoSuchFile:
581
 
            # Try to create the parent directory, and then go back to
582
 
            # writing the file
583
 
            parent_dir = os.path.dirname(abspath)
584
 
            self._mkdir(parent_dir, dir_mode)
585
 
            _open_and_write_file()
586
 
 
587
 
    def put_file_non_atomic(self, relpath, f, mode=None,
588
 
                            create_parent_dir=False,
589
 
                            dir_mode=None):
590
 
        """Copy the file-like object into the target location.
591
 
 
592
 
        This function is not strictly safe to use. It is only meant to
593
 
        be used when you already know that the target does not exist.
594
 
        It is not safe, because it will open and truncate the remote
595
 
        file. So there may be a time when the file has invalid contents.
596
 
 
597
 
        :param relpath: The remote location to put the contents.
598
 
        :param f:       File-like object.
599
 
        :param mode:    Possible access permissions for new file.
600
 
                        None means do not set remote permissions.
601
 
        :param create_parent_dir: If we cannot create the target file because
602
 
                        the parent directory does not exist, go ahead and
603
 
                        create it, and then try again.
604
 
        """
605
 
        def writer(fout):
606
 
            self._pump(f, fout)
607
 
        self._put_non_atomic_helper(relpath, writer, mode=mode,
608
 
                                    create_parent_dir=create_parent_dir,
609
 
                                    dir_mode=dir_mode)
610
 
 
611
 
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
612
 
                             create_parent_dir=False,
613
 
                             dir_mode=None):
614
 
        def writer(fout):
615
 
            fout.write(bytes)
616
 
        self._put_non_atomic_helper(relpath, writer, mode=mode,
617
 
                                    create_parent_dir=create_parent_dir,
618
 
                                    dir_mode=dir_mode)
619
 
 
620
 
    def iter_files_recursive(self):
621
 
        """Walk the relative paths of all files in this transport."""
622
 
        # progress is handled by list_dir
623
 
        queue = list(self.list_dir('.'))
624
 
        while queue:
625
 
            relpath = queue.pop(0)
626
 
            st = self.stat(relpath)
627
 
            if stat.S_ISDIR(st.st_mode):
628
 
                for i, basename in enumerate(self.list_dir(relpath)):
629
 
                    queue.insert(i, relpath+'/'+basename)
630
 
            else:
631
 
                yield relpath
632
 
 
633
 
    def _mkdir(self, abspath, mode=None):
634
 
        if mode is None:
635
 
            local_mode = 0777
636
 
        else:
637
 
            local_mode = mode
638
 
        try:
639
 
            self._report_activity(len(abspath), 'write')
640
 
            self._get_sftp().mkdir(abspath, local_mode)
641
 
            self._report_activity(1, 'read')
642
 
            if mode is not None:
643
 
                # chmod a dir through sftp will erase any sgid bit set
644
 
                # on the server side.  So, if the bit mode are already
645
 
                # set, avoid the chmod.  If the mode is not fine but
646
 
                # the sgid bit is set, report a warning to the user
647
 
                # with the umask fix.
648
 
                stat = self._get_sftp().lstat(abspath)
649
 
                mode = mode & 0777 # can't set special bits anyway
650
 
                if mode != stat.st_mode & 0777:
651
 
                    if stat.st_mode & 06000:
652
 
                        warning('About to chmod %s over sftp, which will result'
653
 
                                ' in its suid or sgid bits being cleared.  If'
654
 
                                ' you want to preserve those bits, change your '
655
 
                                ' environment on the server to use umask 0%03o.'
656
 
                                % (abspath, 0777 - mode))
657
 
                    self._get_sftp().chmod(abspath, mode=mode)
658
 
        except (paramiko.SSHException, IOError), e:
659
 
            self._translate_io_exception(e, abspath, ': unable to mkdir',
660
 
                failure_exc=FileExists)
661
 
 
662
 
    def mkdir(self, relpath, mode=None):
663
 
        """Create a directory at the given path."""
664
 
        self._mkdir(self._remote_path(relpath), mode=mode)
665
 
 
666
 
    def open_write_stream(self, relpath, mode=None):
667
 
        """See Transport.open_write_stream."""
668
 
        # initialise the file to zero-length
669
 
        # this is three round trips, but we don't use this
670
 
        # api more than once per write_group at the moment so
671
 
        # it is a tolerable overhead. Better would be to truncate
672
 
        # the file after opening. RBC 20070805
673
 
        self.put_bytes_non_atomic(relpath, "", mode)
674
 
        abspath = self._remote_path(relpath)
675
 
        # TODO: jam 20060816 paramiko doesn't publicly expose a way to
676
 
        #       set the file mode at create time. If it does, use it.
677
 
        #       But for now, we just chmod later anyway.
678
 
        handle = None
679
 
        try:
680
 
            handle = self._get_sftp().file(abspath, mode='wb')
681
 
            handle.set_pipelined(True)
682
 
        except (paramiko.SSHException, IOError), e:
683
 
            self._translate_io_exception(e, abspath,
684
 
                                         ': unable to open')
685
 
        _file_streams[self.abspath(relpath)] = handle
686
 
        return FileFileStream(self, relpath, handle)
687
 
 
688
 
    def _translate_io_exception(self, e, path, more_info='',
689
 
                                failure_exc=PathError):
690
 
        """Translate a paramiko or IOError into a friendlier exception.
691
 
 
692
 
        :param e: The original exception
693
 
        :param path: The path in question when the error is raised
694
 
        :param more_info: Extra information that can be included,
695
 
                          such as what was going on
696
 
        :param failure_exc: Paramiko has the super fun ability to raise completely
697
 
                           opaque errors that just set "e.args = ('Failure',)" with
698
 
                           no more information.
699
 
                           If this parameter is set, it defines the exception
700
 
                           to raise in these cases.
701
 
        """
702
 
        # paramiko seems to generate detailless errors.
703
 
        self._translate_error(e, path, raise_generic=False)
704
 
        if getattr(e, 'args', None) is not None:
705
 
            if (e.args == ('No such file or directory',) or
706
 
                e.args == ('No such file',)):
707
 
                raise NoSuchFile(path, str(e) + more_info)
708
 
            if (e.args == ('mkdir failed',) or
709
 
                e.args[0].startswith('syserr: File exists')):
710
 
                raise FileExists(path, str(e) + more_info)
711
 
            # strange but true, for the paramiko server.
712
 
            if (e.args == ('Failure',)):
713
 
                raise failure_exc(path, str(e) + more_info)
714
 
            mutter('Raising exception with args %s', e.args)
715
 
        if getattr(e, 'errno', None) is not None:
716
 
            mutter('Raising exception with errno %s', e.errno)
717
 
        raise e
718
 
 
719
 
    def append_file(self, relpath, f, mode=None):
720
 
        """
721
 
        Append the text in the file-like object into the final
722
 
        location.
723
 
        """
724
 
        try:
725
 
            path = self._remote_path(relpath)
726
 
            fout = self._get_sftp().file(path, 'ab')
727
 
            if mode is not None:
728
 
                self._get_sftp().chmod(path, mode)
729
 
            result = fout.tell()
730
 
            self._pump(f, fout)
731
 
            return result
732
 
        except (IOError, paramiko.SSHException), e:
733
 
            self._translate_io_exception(e, relpath, ': unable to append')
734
 
 
735
 
    def rename(self, rel_from, rel_to):
736
 
        """Rename without special overwriting"""
737
 
        try:
738
 
            self._get_sftp().rename(self._remote_path(rel_from),
739
 
                              self._remote_path(rel_to))
740
 
        except (IOError, paramiko.SSHException), e:
741
 
            self._translate_io_exception(e, rel_from,
742
 
                    ': unable to rename to %r' % (rel_to))
743
 
 
744
 
    def _rename_and_overwrite(self, abs_from, abs_to):
745
 
        """Do a fancy rename on the remote server.
746
 
 
747
 
        Using the implementation provided by osutils.
748
 
        """
749
 
        try:
750
 
            sftp = self._get_sftp()
751
 
            fancy_rename(abs_from, abs_to,
752
 
                         rename_func=sftp.rename,
753
 
                         unlink_func=sftp.remove)
754
 
        except (IOError, paramiko.SSHException), e:
755
 
            self._translate_io_exception(e, abs_from,
756
 
                                         ': unable to rename to %r' % (abs_to))
757
 
 
758
 
    def move(self, rel_from, rel_to):
759
 
        """Move the item at rel_from to the location at rel_to"""
760
 
        path_from = self._remote_path(rel_from)
761
 
        path_to = self._remote_path(rel_to)
762
 
        self._rename_and_overwrite(path_from, path_to)
763
 
 
764
 
    def delete(self, relpath):
765
 
        """Delete the item at relpath"""
766
 
        path = self._remote_path(relpath)
767
 
        try:
768
 
            self._get_sftp().remove(path)
769
 
        except (IOError, paramiko.SSHException), e:
770
 
            self._translate_io_exception(e, path, ': unable to delete')
771
 
 
772
 
    def external_url(self):
773
 
        """See bzrlib.transport.Transport.external_url."""
774
 
        # the external path for SFTP is the base
775
 
        return self.base
776
 
 
777
 
    def listable(self):
778
 
        """Return True if this store supports listing."""
779
 
        return True
780
 
 
781
 
    def list_dir(self, relpath):
782
 
        """
783
 
        Return a list of all files at the given location.
784
 
        """
785
 
        # does anything actually use this?
786
 
        # -- Unknown
787
 
        # This is at least used by copy_tree for remote upgrades.
788
 
        # -- David Allouche 2006-08-11
789
 
        path = self._remote_path(relpath)
790
 
        try:
791
 
            entries = self._get_sftp().listdir(path)
792
 
            self._report_activity(sum(map(len, entries)), 'read')
793
 
        except (IOError, paramiko.SSHException), e:
794
 
            self._translate_io_exception(e, path, ': failed to list_dir')
795
 
        return [urlutils.escape(entry) for entry in entries]
796
 
 
797
 
    def rmdir(self, relpath):
798
 
        """See Transport.rmdir."""
799
 
        path = self._remote_path(relpath)
800
 
        try:
801
 
            return self._get_sftp().rmdir(path)
802
 
        except (IOError, paramiko.SSHException), e:
803
 
            self._translate_io_exception(e, path, ': failed to rmdir')
804
 
 
805
 
    def stat(self, relpath):
806
 
        """Return the stat information for a file."""
807
 
        path = self._remote_path(relpath)
808
 
        try:
809
 
            return self._get_sftp().stat(path)
810
 
        except (IOError, paramiko.SSHException), e:
811
 
            self._translate_io_exception(e, path, ': unable to stat')
812
 
 
813
 
    def lock_read(self, relpath):
814
 
        """
815
 
        Lock the given file for shared (read) access.
816
 
        :return: A lock object, which has an unlock() member function
817
 
        """
818
 
        # FIXME: there should be something clever i can do here...
819
 
        class BogusLock(object):
820
 
            def __init__(self, path):
821
 
                self.path = path
822
 
            def unlock(self):
823
 
                pass
824
 
        return BogusLock(relpath)
825
 
 
826
 
    def lock_write(self, relpath):
827
 
        """
828
 
        Lock the given file for exclusive (write) access.
829
 
        WARNING: many transports do not support this, so trying avoid using it
830
 
 
831
 
        :return: A lock object, which has an unlock() member function
832
 
        """
833
 
        # This is a little bit bogus, but basically, we create a file
834
 
        # which should not already exist, and if it does, we assume
835
 
        # that there is a lock, and if it doesn't, the we assume
836
 
        # that we have taken the lock.
837
 
        return SFTPLock(relpath, self)
838
 
 
839
 
    def _sftp_open_exclusive(self, abspath, mode=None):
840
 
        """Open a remote path exclusively.
841
 
 
842
 
        SFTP supports O_EXCL (SFTP_FLAG_EXCL), which fails if
843
 
        the file already exists. However it does not expose this
844
 
        at the higher level of SFTPClient.open(), so we have to
845
 
        sneak away with it.
846
 
 
847
 
        WARNING: This breaks the SFTPClient abstraction, so it
848
 
        could easily break against an updated version of paramiko.
849
 
 
850
 
        :param abspath: The remote absolute path where the file should be opened
851
 
        :param mode: The mode permissions bits for the new file
852
 
        """
853
 
        # TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
854
 
        #       using the 'x' flag to indicate SFTP_FLAG_EXCL.
855
 
        #       However, there is no way to set the permission mode at open
856
 
        #       time using the sftp_client.file() functionality.
857
 
        path = self._get_sftp()._adjust_cwd(abspath)
858
 
        # mutter('sftp abspath %s => %s', abspath, path)
859
 
        attr = SFTPAttributes()
860
 
        if mode is not None:
861
 
            attr.st_mode = mode
862
 
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE
863
 
                | SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
864
 
        try:
865
 
            t, msg = self._get_sftp()._request(CMD_OPEN, path, omode, attr)
866
 
            if t != CMD_HANDLE:
867
 
                raise TransportError('Expected an SFTP handle')
868
 
            handle = msg.get_string()
869
 
            return SFTPFile(self._get_sftp(), handle, 'wb', -1)
870
 
        except (paramiko.SSHException, IOError), e:
871
 
            self._translate_io_exception(e, abspath, ': unable to open',
872
 
                failure_exc=FileExists)
873
 
 
874
 
    def _can_roundtrip_unix_modebits(self):
875
 
        if sys.platform == 'win32':
876
 
            # anyone else?
877
 
            return False
878
 
        else:
879
 
            return True
880
 
 
881
 
# ------------- server test implementation --------------
882
 
import threading
883
 
 
884
 
from bzrlib.tests.stub_sftp import StubServer, StubSFTPServer
885
 
 
886
 
STUB_SERVER_KEY = """
887
 
-----BEGIN RSA PRIVATE KEY-----
888
 
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
889
 
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
890
 
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
891
 
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
892
 
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
893
 
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
894
 
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
895
 
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
896
 
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
897
 
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
898
 
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
899
 
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
900
 
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
901
 
-----END RSA PRIVATE KEY-----
902
 
"""
903
 
 
904
 
 
905
 
class SocketListener(threading.Thread):
906
 
 
907
 
    def __init__(self, callback):
908
 
        threading.Thread.__init__(self)
909
 
        self._callback = callback
910
 
        self._socket = socket.socket()
911
 
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
912
 
        self._socket.bind(('localhost', 0))
913
 
        self._socket.listen(1)
914
 
        self.port = self._socket.getsockname()[1]
915
 
        self._stop_event = threading.Event()
916
 
 
917
 
    def stop(self):
918
 
        # called from outside this thread
919
 
        self._stop_event.set()
920
 
        # use a timeout here, because if the test fails, the server thread may
921
 
        # never notice the stop_event.
922
 
        self.join(5.0)
923
 
        self._socket.close()
924
 
 
925
 
    def run(self):
926
 
        while True:
927
 
            readable, writable_unused, exception_unused = \
928
 
                select.select([self._socket], [], [], 0.1)
929
 
            if self._stop_event.isSet():
930
 
                return
931
 
            if len(readable) == 0:
932
 
                continue
933
 
            try:
934
 
                s, addr_unused = self._socket.accept()
935
 
                # because the loopback socket is inline, and transports are
936
 
                # never explicitly closed, best to launch a new thread.
937
 
                threading.Thread(target=self._callback, args=(s,)).start()
938
 
            except socket.error, x:
939
 
                sys.excepthook(*sys.exc_info())
940
 
                warning('Socket error during accept() within unit test server'
941
 
                        ' thread: %r' % x)
942
 
            except Exception, x:
943
 
                # probably a failed test; unit test thread will log the
944
 
                # failure/error
945
 
                sys.excepthook(*sys.exc_info())
946
 
                warning('Exception from within unit test server thread: %r' %
947
 
                        x)
948
 
 
949
 
 
950
 
class SocketDelay(object):
951
 
    """A socket decorator to make TCP appear slower.
952
 
 
953
 
    This changes recv, send, and sendall to add a fixed latency to each python
954
 
    call if a new roundtrip is detected. That is, when a recv is called and the
955
 
    flag new_roundtrip is set, latency is charged. Every send and send_all
956
 
    sets this flag.
957
 
 
958
 
    In addition every send, sendall and recv sleeps a bit per character send to
959
 
    simulate bandwidth.
960
 
 
961
 
    Not all methods are implemented, this is deliberate as this class is not a
962
 
    replacement for the builtin sockets layer. fileno is not implemented to
963
 
    prevent the proxy being bypassed.
964
 
    """
965
 
 
966
 
    simulated_time = 0
967
 
    _proxied_arguments = dict.fromkeys([
968
 
        "close", "getpeername", "getsockname", "getsockopt", "gettimeout",
969
 
        "setblocking", "setsockopt", "settimeout", "shutdown"])
970
 
 
971
 
    def __init__(self, sock, latency, bandwidth=1.0,
972
 
                 really_sleep=True):
973
 
        """
974
 
        :param bandwith: simulated bandwith (MegaBit)
975
 
        :param really_sleep: If set to false, the SocketDelay will just
976
 
        increase a counter, instead of calling time.sleep. This is useful for
977
 
        unittesting the SocketDelay.
978
 
        """
979
 
        self.sock = sock
980
 
        self.latency = latency
981
 
        self.really_sleep = really_sleep
982
 
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024)
983
 
        self.new_roundtrip = False
984
 
 
985
 
    def sleep(self, s):
986
 
        if self.really_sleep:
987
 
            time.sleep(s)
988
 
        else:
989
 
            SocketDelay.simulated_time += s
990
 
 
991
 
    def __getattr__(self, attr):
992
 
        if attr in SocketDelay._proxied_arguments:
993
 
            return getattr(self.sock, attr)
994
 
        raise AttributeError("'SocketDelay' object has no attribute %r" %
995
 
                             attr)
996
 
 
997
 
    def dup(self):
998
 
        return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
999
 
                           self._sleep)
1000
 
 
1001
 
    def recv(self, *args):
1002
 
        data = self.sock.recv(*args)
1003
 
        if data and self.new_roundtrip:
1004
 
            self.new_roundtrip = False
1005
 
            self.sleep(self.latency)
1006
 
        self.sleep(len(data) * self.time_per_byte)
1007
 
        return data
1008
 
 
1009
 
    def sendall(self, data, flags=0):
1010
 
        if not self.new_roundtrip:
1011
 
            self.new_roundtrip = True
1012
 
            self.sleep(self.latency)
1013
 
        self.sleep(len(data) * self.time_per_byte)
1014
 
        return self.sock.sendall(data, flags)
1015
 
 
1016
 
    def send(self, data, flags=0):
1017
 
        if not self.new_roundtrip:
1018
 
            self.new_roundtrip = True
1019
 
            self.sleep(self.latency)
1020
 
        bytes_sent = self.sock.send(data, flags)
1021
 
        self.sleep(bytes_sent * self.time_per_byte)
1022
 
        return bytes_sent
1023
 
 
1024
 
 
1025
 
class SFTPServer(Server):
1026
 
    """Common code for SFTP server facilities."""
1027
 
 
1028
 
    def __init__(self, server_interface=StubServer):
1029
 
        self._original_vendor = None
1030
 
        self._homedir = None
1031
 
        self._server_homedir = None
1032
 
        self._listener = None
1033
 
        self._root = None
1034
 
        self._vendor = ssh.ParamikoVendor()
1035
 
        self._server_interface = server_interface
1036
 
        # sftp server logs
1037
 
        self.logs = []
1038
 
        self.add_latency = 0
1039
 
 
1040
 
    def _get_sftp_url(self, path):
1041
 
        """Calculate an sftp url to this server for path."""
1042
 
        return 'sftp://foo:bar@localhost:%d/%s' % (self._listener.port, path)
1043
 
 
1044
 
    def log(self, message):
1045
 
        """StubServer uses this to log when a new server is created."""
1046
 
        self.logs.append(message)
1047
 
 
1048
 
    def _run_server_entry(self, sock):
1049
 
        """Entry point for all implementations of _run_server.
1050
 
 
1051
 
        If self.add_latency is > 0.000001 then sock is given a latency adding
1052
 
        decorator.
1053
 
        """
1054
 
        if self.add_latency > 0.000001:
1055
 
            sock = SocketDelay(sock, self.add_latency)
1056
 
        return self._run_server(sock)
1057
 
 
1058
 
    def _run_server(self, s):
1059
 
        ssh_server = paramiko.Transport(s)
1060
 
        key_file = pathjoin(self._homedir, 'test_rsa.key')
1061
 
        f = open(key_file, 'w')
1062
 
        f.write(STUB_SERVER_KEY)
1063
 
        f.close()
1064
 
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
1065
 
        ssh_server.add_server_key(host_key)
1066
 
        server = self._server_interface(self)
1067
 
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
1068
 
                                         StubSFTPServer, root=self._root,
1069
 
                                         home=self._server_homedir)
1070
 
        event = threading.Event()
1071
 
        ssh_server.start_server(event, server)
1072
 
        event.wait(5.0)
1073
 
 
1074
 
    def setUp(self, backing_server=None):
1075
 
        # XXX: TODO: make sftpserver back onto backing_server rather than local
1076
 
        # disk.
1077
 
        if not (backing_server is None or
1078
 
                isinstance(backing_server, local.LocalURLServer)):
1079
 
            raise AssertionError(
1080
 
                "backing_server should not be %r, because this can only serve the "
1081
 
                "local current working directory." % (backing_server,))
1082
 
        self._original_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
1083
 
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._vendor
1084
 
        if sys.platform == 'win32':
1085
 
            # Win32 needs to use the UNICODE api
1086
 
            self._homedir = getcwd()
1087
 
        else:
1088
 
            # But Linux SFTP servers should just deal in bytestreams
1089
 
            self._homedir = os.getcwd()
1090
 
        if self._server_homedir is None:
1091
 
            self._server_homedir = self._homedir
1092
 
        self._root = '/'
1093
 
        if sys.platform == 'win32':
1094
 
            self._root = ''
1095
 
        self._listener = SocketListener(self._run_server_entry)
1096
 
        self._listener.setDaemon(True)
1097
 
        self._listener.start()
1098
 
 
1099
 
    def tearDown(self):
1100
 
        """See bzrlib.transport.Server.tearDown."""
1101
 
        self._listener.stop()
1102
 
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._original_vendor
1103
 
 
1104
 
    def get_bogus_url(self):
1105
 
        """See bzrlib.transport.Server.get_bogus_url."""
1106
 
        # this is chosen to try to prevent trouble with proxies, wierd dns, etc
1107
 
        # we bind a random socket, so that we get a guaranteed unused port
1108
 
        # we just never listen on that port
1109
 
        s = socket.socket()
1110
 
        s.bind(('localhost', 0))
1111
 
        return 'sftp://%s:%s/' % s.getsockname()
1112
 
 
1113
 
 
1114
 
class SFTPFullAbsoluteServer(SFTPServer):
1115
 
    """A test server for sftp transports, using absolute urls and ssh."""
1116
 
 
1117
 
    def get_url(self):
1118
 
        """See bzrlib.transport.Server.get_url."""
1119
 
        homedir = self._homedir
1120
 
        if sys.platform != 'win32':
1121
 
            # Remove the initial '/' on all platforms but win32
1122
 
            homedir = homedir[1:]
1123
 
        return self._get_sftp_url(urlutils.escape(homedir))
1124
 
 
1125
 
 
1126
 
class SFTPServerWithoutSSH(SFTPServer):
1127
 
    """An SFTP server that uses a simple TCP socket pair rather than SSH."""
1128
 
 
1129
 
    def __init__(self):
1130
 
        super(SFTPServerWithoutSSH, self).__init__()
1131
 
        self._vendor = ssh.LoopbackVendor()
1132
 
 
1133
 
    def _run_server(self, sock):
1134
 
        # Re-import these as locals, so that they're still accessible during
1135
 
        # interpreter shutdown (when all module globals get set to None, leading
1136
 
        # to confusing errors like "'NoneType' object has no attribute 'error'".
1137
 
        class FakeChannel(object):
1138
 
            def get_transport(self):
1139
 
                return self
1140
 
            def get_log_channel(self):
1141
 
                return 'paramiko'
1142
 
            def get_name(self):
1143
 
                return '1'
1144
 
            def get_hexdump(self):
1145
 
                return False
1146
 
            def close(self):
1147
 
                pass
1148
 
 
1149
 
        server = paramiko.SFTPServer(
1150
 
            FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
1151
 
            root=self._root, home=self._server_homedir)
1152
 
        try:
1153
 
            server.start_subsystem(
1154
 
                'sftp', None, ssh.SocketAsChannelAdapter(sock))
1155
 
        except socket.error, e:
1156
 
            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
1157
 
                # it's okay for the client to disconnect abruptly
1158
 
                # (bug in paramiko 1.6: it should absorb this exception)
1159
 
                pass
1160
 
            else:
1161
 
                raise
1162
 
        except Exception, e:
1163
 
            # This typically seems to happen during interpreter shutdown, so
1164
 
            # most of the useful ways to report this error are won't work.
1165
 
            # Writing the exception type, and then the text of the exception,
1166
 
            # seems to be the best we can do.
1167
 
            import sys
1168
 
            sys.stderr.write('\nEXCEPTION %r: ' % (e.__class__,))
1169
 
            sys.stderr.write('%s\n\n' % (e,))
1170
 
        server.finish_subsystem()
1171
 
 
1172
 
 
1173
 
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
1174
 
    """A test server for sftp transports, using absolute urls."""
1175
 
 
1176
 
    def get_url(self):
1177
 
        """See bzrlib.transport.Server.get_url."""
1178
 
        homedir = self._homedir
1179
 
        if sys.platform != 'win32':
1180
 
            # Remove the initial '/' on all platforms but win32
1181
 
            homedir = homedir[1:]
1182
 
        return self._get_sftp_url(urlutils.escape(homedir))
1183
 
 
1184
 
 
1185
 
class SFTPHomeDirServer(SFTPServerWithoutSSH):
1186
 
    """A test server for sftp transports, using homedir relative urls."""
1187
 
 
1188
 
    def get_url(self):
1189
 
        """See bzrlib.transport.Server.get_url."""
1190
 
        return self._get_sftp_url("~/")
1191
 
 
1192
 
 
1193
 
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
1194
 
    """A test server for sftp transports where only absolute paths will work.
1195
 
 
1196
 
    It does this by serving from a deeply-nested directory that doesn't exist.
1197
 
    """
1198
 
 
1199
 
    def setUp(self, backing_server=None):
1200
 
        self._server_homedir = '/dev/noone/runs/tests/here'
1201
 
        super(SFTPSiblingAbsoluteServer, self).setUp(backing_server)
1202
 
 
1203
 
 
1204
 
def get_test_permutations():
1205
 
    """Return the permutations to be used in testing."""
1206
 
    return [(SFTPTransport, SFTPAbsoluteServer),
1207
 
            (SFTPTransport, SFTPHomeDirServer),
1208
 
            (SFTPTransport, SFTPSiblingAbsoluteServer),
1209
 
            ]