~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

  • Committer: Mark Hammond
  • Date: 2008-08-10 11:16:39 UTC
  • mto: (3606.5.3 1.6)
  • mto: This revision was merged to the branch mainline in revision 3626.
  • Revision ID: mhammond@skippinet.com.au-20080810111639-9upz1wig1s8lvz62
install all files with 'restartreplace uninsrestartdelete' and close
tbzrcache at uninstall time.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
 
2
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
2
3
#
3
4
# This program is free software; you can redistribute it and/or modify
4
5
# it under the terms of the GNU General Public License as published by
12
13
#
13
14
# You should have received a copy of the GNU General Public License
14
15
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
17
 
17
18
"""Implementation of Transport over SFTP, using paramiko."""
18
19
 
23
24
# suite.  Those formats all date back to 0.7; so we should be able to remove
24
25
# these methods when we officially drop support for those formats.
25
26
 
26
 
import bisect
27
27
import errno
28
 
import itertools
29
28
import os
30
29
import random
31
30
import select
38
37
import warnings
39
38
 
40
39
from bzrlib import (
41
 
    config,
42
 
    debug,
43
40
    errors,
44
41
    urlutils,
45
42
    )
96
93
 
97
94
class SFTPLock(object):
98
95
    """This fakes a lock in a remote location.
99
 
 
 
96
    
100
97
    A present lock is indicated just by the existence of a file.  This
101
 
    doesn't work well on all transports and they are only used in
 
98
    doesn't work well on all transports and they are only used in 
102
99
    deprecated storage formats.
103
100
    """
104
 
 
 
101
    
105
102
    __slots__ = ['path', 'lock_path', 'lock_file', 'transport']
106
103
 
107
104
    def __init__(self, path, transport):
134
131
            pass
135
132
 
136
133
 
137
 
class _SFTPReadvHelper(object):
138
 
    """A class to help with managing the state of a readv request."""
139
 
 
140
 
    # See _get_requests for an explanation.
141
 
    _max_request_size = 32768
142
 
 
143
 
    def __init__(self, original_offsets, relpath, _report_activity):
144
 
        """Create a new readv helper.
145
 
 
146
 
        :param original_offsets: The original requests given by the caller of
147
 
            readv()
148
 
        :param relpath: The name of the file (if known)
149
 
        :param _report_activity: A Transport._report_activity bound method,
150
 
            to be called as data arrives.
151
 
        """
152
 
        self.original_offsets = list(original_offsets)
153
 
        self.relpath = relpath
154
 
        self._report_activity = _report_activity
155
 
 
156
 
    def _get_requests(self):
157
 
        """Break up the offsets into individual requests over sftp.
158
 
 
159
 
        The SFTP spec only requires implementers to support 32kB requests. We
160
 
        could try something larger (openssh supports 64kB), but then we have to
161
 
        handle requests that fail.
162
 
        So instead, we just break up our maximum chunks into 32kB chunks, and
163
 
        asyncronously requests them.
164
 
        Newer versions of paramiko would do the chunking for us, but we want to
165
 
        start processing results right away, so we do it ourselves.
166
 
        """
167
 
        # TODO: Because we issue async requests, we don't 'fudge' any extra
168
 
        #       data.  I'm not 100% sure that is the best choice.
169
 
 
170
 
        # The first thing we do, is to collapse the individual requests as much
171
 
        # as possible, so we don't issues requests <32kB
172
 
        sorted_offsets = sorted(self.original_offsets)
173
 
        coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
174
 
                                                        limit=0, fudge_factor=0))
175
 
        requests = []
176
 
        for c_offset in coalesced:
177
 
            start = c_offset.start
178
 
            size = c_offset.length
179
 
 
180
 
            # Break this up into 32kB requests
181
 
            while size > 0:
182
 
                next_size = min(size, self._max_request_size)
183
 
                requests.append((start, next_size))
184
 
                size -= next_size
185
 
                start += next_size
186
 
        if 'sftp' in debug.debug_flags:
187
 
            mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
188
 
                self.relpath, len(sorted_offsets), len(coalesced),
189
 
                len(requests))
190
 
        return requests
191
 
 
192
 
    def request_and_yield_offsets(self, fp):
193
 
        """Request the data from the remote machine, yielding the results.
194
 
 
195
 
        :param fp: A Paramiko SFTPFile object that supports readv.
196
 
        :return: Yield the data requested by the original readv caller, one by
197
 
            one.
198
 
        """
199
 
        requests = self._get_requests()
200
 
        offset_iter = iter(self.original_offsets)
201
 
        cur_offset, cur_size = offset_iter.next()
202
 
        # paramiko .readv() yields strings that are in the order of the requests
203
 
        # So we track the current request to know where the next data is
204
 
        # being returned from.
205
 
        input_start = None
206
 
        last_end = None
207
 
        buffered_data = []
208
 
        buffered_len = 0
209
 
 
210
 
        # This is used to buffer chunks which we couldn't process yet
211
 
        # It is (start, end, data) tuples.
212
 
        data_chunks = []
213
 
        # Create an 'unlimited' data stream, so we stop based on requests,
214
 
        # rather than just because the data stream ended. This lets us detect
215
 
        # short readv.
216
 
        data_stream = itertools.chain(fp.readv(requests),
217
 
                                      itertools.repeat(None))
218
 
        for (start, length), data in itertools.izip(requests, data_stream):
219
 
            if data is None:
220
 
                if cur_coalesced is not None:
221
 
                    raise errors.ShortReadvError(self.relpath,
222
 
                        start, length, len(data))
223
 
            if len(data) != length:
224
 
                raise errors.ShortReadvError(self.relpath,
225
 
                    start, length, len(data))
226
 
            self._report_activity(length, 'read')
227
 
            if last_end is None:
228
 
                # This is the first request, just buffer it
229
 
                buffered_data = [data]
230
 
                buffered_len = length
231
 
                input_start = start
232
 
            elif start == last_end:
233
 
                # The data we are reading fits neatly on the previous
234
 
                # buffer, so this is all part of a larger coalesced range.
235
 
                buffered_data.append(data)
236
 
                buffered_len += length
237
 
            else:
238
 
                # We have an 'interrupt' in the data stream. So we know we are
239
 
                # at a request boundary.
240
 
                if buffered_len > 0:
241
 
                    # We haven't consumed the buffer so far, so put it into
242
 
                    # data_chunks, and continue.
243
 
                    buffered = ''.join(buffered_data)
244
 
                    data_chunks.append((input_start, buffered))
245
 
                input_start = start
246
 
                buffered_data = [data]
247
 
                buffered_len = length
248
 
            last_end = start + length
249
 
            if input_start == cur_offset and cur_size <= buffered_len:
250
 
                # Simplify the next steps a bit by transforming buffered_data
251
 
                # into a single string. We also have the nice property that
252
 
                # when there is only one string ''.join([x]) == x, so there is
253
 
                # no data copying.
254
 
                buffered = ''.join(buffered_data)
255
 
                # Clean out buffered data so that we keep memory
256
 
                # consumption low
257
 
                del buffered_data[:]
258
 
                buffered_offset = 0
259
 
                # TODO: We *could* also consider the case where cur_offset is in
260
 
                #       in the buffered range, even though it doesn't *start*
261
 
                #       the buffered range. But for packs we pretty much always
262
 
                #       read in order, so you won't get any extra data in the
263
 
                #       middle.
264
 
                while (input_start == cur_offset
265
 
                       and (buffered_offset + cur_size) <= buffered_len):
266
 
                    # We've buffered enough data to process this request, spit it
267
 
                    # out
268
 
                    cur_data = buffered[buffered_offset:buffered_offset + cur_size]
269
 
                    # move the direct pointer into our buffered data
270
 
                    buffered_offset += cur_size
271
 
                    # Move the start-of-buffer pointer
272
 
                    input_start += cur_size
273
 
                    # Yield the requested data
274
 
                    yield cur_offset, cur_data
275
 
                    cur_offset, cur_size = offset_iter.next()
276
 
                # at this point, we've consumed as much of buffered as we can,
277
 
                # so break off the portion that we consumed
278
 
                if buffered_offset == len(buffered_data):
279
 
                    # No tail to leave behind
280
 
                    buffered_data = []
281
 
                    buffered_len = 0
282
 
                else:
283
 
                    buffered = buffered[buffered_offset:]
284
 
                    buffered_data = [buffered]
285
 
                    buffered_len = len(buffered)
286
 
        if buffered_len:
287
 
            buffered = ''.join(buffered_data)
288
 
            del buffered_data[:]
289
 
            data_chunks.append((input_start, buffered))
290
 
        if data_chunks:
291
 
            if 'sftp' in debug.debug_flags:
292
 
                mutter('SFTP readv left with %d out-of-order bytes',
293
 
                    sum(map(lambda x: len(x[1]), data_chunks)))
294
 
            # We've processed all the readv data, at this point, anything we
295
 
            # couldn't process is in data_chunks. This doesn't happen often, so
296
 
            # this code path isn't optimized
297
 
            # We use an interesting process for data_chunks
298
 
            # Specifically if we have "bisect_left([(start, len, entries)],
299
 
            #                                       (qstart,)])
300
 
            # If start == qstart, then we get the specific node. Otherwise we
301
 
            # get the previous node
302
 
            while True:
303
 
                idx = bisect.bisect_left(data_chunks, (cur_offset,))
304
 
                if idx < len(data_chunks) and data_chunks[idx][0] == cur_offset:
305
 
                    # The data starts here
306
 
                    data = data_chunks[idx][1][:cur_size]
307
 
                elif idx > 0:
308
 
                    # The data is in a portion of a previous page
309
 
                    idx -= 1
310
 
                    sub_offset = cur_offset - data_chunks[idx][0]
311
 
                    data = data_chunks[idx][1]
312
 
                    data = data[sub_offset:sub_offset + cur_size]
313
 
                else:
314
 
                    # We are missing the page where the data should be found,
315
 
                    # something is wrong
316
 
                    data = ''
317
 
                if len(data) != cur_size:
318
 
                    raise AssertionError('We must have miscalulated.'
319
 
                        ' We expected %d bytes, but only found %d'
320
 
                        % (cur_size, len(data)))
321
 
                yield cur_offset, data
322
 
                cur_offset, cur_size = offset_iter.next()
323
 
 
324
 
 
325
134
class SFTPTransport(ConnectedTransport):
326
135
    """Transport implementation for SFTP access."""
327
136
 
349
158
 
350
159
    def _remote_path(self, relpath):
351
160
        """Return the path to be passed along the sftp protocol for relpath.
352
 
 
 
161
        
353
162
        :param relpath: is a urlencoded string.
354
163
        """
355
164
        relative = urlutils.unescape(relpath).encode('utf-8')
383
192
            password = credentials
384
193
 
385
194
        vendor = ssh._get_ssh_vendor()
386
 
        user = self._user
387
 
        if user is None:
388
 
            auth = config.AuthenticationConfig()
389
 
            user = auth.get_user('ssh', self._host, self._port)
390
195
        connection = vendor.connect_sftp(self._user, password,
391
196
                                         self._host, self._port)
392
 
        return connection, (user, password)
 
197
        return connection, password
393
198
 
394
199
    def _get_sftp(self):
395
200
        """Ensures that a connection is established"""
406
211
        """
407
212
        try:
408
213
            self._get_sftp().stat(self._remote_path(relpath))
409
 
            # stat result is about 20 bytes, let's say
410
 
            self._report_activity(20, 'read')
411
214
            return True
412
215
        except IOError:
413
216
            return False
414
217
 
415
218
    def get(self, relpath):
416
 
        """Get the file at the given relative path.
 
219
        """
 
220
        Get the file at the given relative path.
417
221
 
418
222
        :param relpath: The relative path to the file
419
223
        """
420
224
        try:
421
 
            # FIXME: by returning the file directly, we don't pass this
422
 
            # through to report_activity.  We could try wrapping the object
423
 
            # before it's returned.  For readv and get_bytes it's handled in
424
 
            # the higher-level function.
425
 
            # -- mbp 20090126
426
225
            path = self._remote_path(relpath)
427
226
            f = self._get_sftp().file(path, mode='rb')
428
227
            if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
432
231
            self._translate_io_exception(e, path, ': error retrieving',
433
232
                failure_exc=errors.ReadError)
434
233
 
435
 
    def get_bytes(self, relpath):
436
 
        # reimplement this here so that we can report how many bytes came back
437
 
        f = self.get(relpath)
438
 
        try:
439
 
            bytes = f.read()
440
 
            self._report_activity(len(bytes), 'read')
441
 
            return bytes
442
 
        finally:
443
 
            f.close()
444
 
 
445
234
    def _readv(self, relpath, offsets):
446
235
        """See Transport.readv()"""
447
236
        # We overload the default readv() because we want to use a file
456
245
            readv = getattr(fp, 'readv', None)
457
246
            if readv:
458
247
                return self._sftp_readv(fp, offsets, relpath)
459
 
            if 'sftp' in debug.debug_flags:
460
 
                mutter('seek and read %s offsets', len(offsets))
 
248
            mutter('seek and read %s offsets', len(offsets))
461
249
            return self._seek_and_read(fp, offsets, relpath)
462
250
        except (IOError, paramiko.SSHException), e:
463
251
            self._translate_io_exception(e, path, ': error retrieving')
470
258
        """
471
259
        return 64 * 1024
472
260
 
473
 
    def _sftp_readv(self, fp, offsets, relpath):
 
261
    def _sftp_readv(self, fp, offsets, relpath='<unknown>'):
474
262
        """Use the readv() member of fp to do async readv.
475
263
 
476
 
        Then read them using paramiko.readv(). paramiko.readv()
 
264
        And then read them using paramiko.readv(). paramiko.readv()
477
265
        does not support ranges > 64K, so it caps the request size, and
478
 
        just reads until it gets all the stuff it wants.
 
266
        just reads until it gets all the stuff it wants
479
267
        """
480
 
        helper = _SFTPReadvHelper(offsets, relpath, self._report_activity)
481
 
        return helper.request_and_yield_offsets(fp)
 
268
        offsets = list(offsets)
 
269
        sorted_offsets = sorted(offsets)
 
270
 
 
271
        # The algorithm works as follows:
 
272
        # 1) Coalesce nearby reads into a single chunk
 
273
        #    This generates a list of combined regions, the total size
 
274
        #    and the size of the sub regions. This coalescing step is limited
 
275
        #    in the number of nearby chunks to combine, and is allowed to
 
276
        #    skip small breaks in the requests. Limiting it makes sure that
 
277
        #    we can start yielding some data earlier, and skipping means we
 
278
        #    make fewer requests. (Beneficial even when using async)
 
279
        # 2) Break up this combined regions into chunks that are smaller
 
280
        #    than 64KiB. Technically the limit is 65536, but we are a
 
281
        #    little bit conservative. This is because sftp has a maximum
 
282
        #    return chunk size of 64KiB (max size of an unsigned short)
 
283
        # 3) Issue a readv() to paramiko to create an async request for
 
284
        #    all of this data
 
285
        # 4) Read in the data as it comes back, until we've read one
 
286
        #    continuous section as determined in step 1
 
287
        # 5) Break up the full sections into hunks for the original requested
 
288
        #    offsets. And put them in a cache
 
289
        # 6) Check if the next request is in the cache, and if it is, remove
 
290
        #    it from the cache, and yield its data. Continue until no more
 
291
        #    entries are in the cache.
 
292
        # 7) loop back to step 4 until all data has been read
 
293
        #
 
294
        # TODO: jam 20060725 This could be optimized one step further, by
 
295
        #       attempting to yield whatever data we have read, even before
 
296
        #       the first coallesced section has been fully processed.
 
297
 
 
298
        # When coalescing for use with readv(), we don't really need to
 
299
        # use any fudge factor, because the requests are made asynchronously
 
300
        coalesced = list(self._coalesce_offsets(sorted_offsets,
 
301
                               limit=self._max_readv_combine,
 
302
                               fudge_factor=0,
 
303
                               ))
 
304
        requests = []
 
305
        for c_offset in coalesced:
 
306
            start = c_offset.start
 
307
            size = c_offset.length
 
308
 
 
309
            # We need to break this up into multiple requests
 
310
            while size > 0:
 
311
                next_size = min(size, self._max_request_size)
 
312
                requests.append((start, next_size))
 
313
                size -= next_size
 
314
                start += next_size
 
315
 
 
316
        mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
 
317
                len(offsets), len(coalesced), len(requests))
 
318
 
 
319
        # Queue the current read until we have read the full coalesced section
 
320
        cur_data = []
 
321
        cur_data_len = 0
 
322
        cur_coalesced_stack = iter(coalesced)
 
323
        cur_coalesced = cur_coalesced_stack.next()
 
324
 
 
325
        # Cache the results, but only until they have been fulfilled
 
326
        data_map = {}
 
327
        # turn the list of offsets into a stack
 
328
        offset_stack = iter(offsets)
 
329
        cur_offset_and_size = offset_stack.next()
 
330
 
 
331
        for data in fp.readv(requests):
 
332
            cur_data += data
 
333
            cur_data_len += len(data)
 
334
 
 
335
            if cur_data_len < cur_coalesced.length:
 
336
                continue
 
337
            if cur_data_len != cur_coalesced.length:
 
338
                raise AssertionError(
 
339
                    "Somehow we read too much: %s != %s" 
 
340
                    % (cur_data_len, cur_coalesced.length))
 
341
            all_data = ''.join(cur_data)
 
342
            cur_data = []
 
343
            cur_data_len = 0
 
344
 
 
345
            for suboffset, subsize in cur_coalesced.ranges:
 
346
                key = (cur_coalesced.start+suboffset, subsize)
 
347
                data_map[key] = all_data[suboffset:suboffset+subsize]
 
348
 
 
349
            # Now that we've read some data, see if we can yield anything back
 
350
            while cur_offset_and_size in data_map:
 
351
                this_data = data_map.pop(cur_offset_and_size)
 
352
                yield cur_offset_and_size[0], this_data
 
353
                cur_offset_and_size = offset_stack.next()
 
354
 
 
355
            # We read a coalesced entry, so mark it as done
 
356
            cur_coalesced = None
 
357
            # Now that we've read all of the data for this coalesced section
 
358
            # on to the next
 
359
            cur_coalesced = cur_coalesced_stack.next()
 
360
 
 
361
        if cur_coalesced is not None:
 
362
            raise errors.ShortReadvError(relpath, cur_coalesced.start,
 
363
                cur_coalesced.length, len(data))
482
364
 
483
365
    def put_file(self, relpath, f, mode=None):
484
366
        """
509
391
            #      sticky bit. So it is probably best to stop chmodding, and
510
392
            #      just tell users that they need to set the umask correctly.
511
393
            #      The attr.st_mode = mode, in _sftp_open_exclusive
512
 
            #      will handle when the user wants the final mode to be more
513
 
            #      restrictive. And then we avoid a round trip. Unless
 
394
            #      will handle when the user wants the final mode to be more 
 
395
            #      restrictive. And then we avoid a round trip. Unless 
514
396
            #      paramiko decides to expose an async chmod()
515
397
 
516
398
            # This is designed to chmod() right before we close.
517
 
            # Because we set_pipelined() earlier, theoretically we might
 
399
            # Because we set_pipelined() earlier, theoretically we might 
518
400
            # avoid the round trip for fout.close()
519
401
            if mode is not None:
520
402
                self._get_sftp().chmod(tmp_abspath, mode)
562
444
                                                 ': unable to open')
563
445
 
564
446
                # This is designed to chmod() right before we close.
565
 
                # Because we set_pipelined() earlier, theoretically we might
 
447
                # Because we set_pipelined() earlier, theoretically we might 
566
448
                # avoid the round trip for fout.close()
567
449
                if mode is not None:
568
450
                    self._get_sftp().chmod(abspath, mode)
619
501
 
620
502
    def iter_files_recursive(self):
621
503
        """Walk the relative paths of all files in this transport."""
622
 
        # progress is handled by list_dir
623
504
        queue = list(self.list_dir('.'))
624
505
        while queue:
625
506
            relpath = queue.pop(0)
636
517
        else:
637
518
            local_mode = mode
638
519
        try:
639
 
            self._report_activity(len(abspath), 'write')
640
520
            self._get_sftp().mkdir(abspath, local_mode)
641
 
            self._report_activity(1, 'read')
642
521
            if mode is not None:
643
522
                # chmod a dir through sftp will erase any sgid bit set
644
523
                # on the server side.  So, if the bit mode are already
666
545
    def open_write_stream(self, relpath, mode=None):
667
546
        """See Transport.open_write_stream."""
668
547
        # initialise the file to zero-length
669
 
        # this is three round trips, but we don't use this
670
 
        # api more than once per write_group at the moment so
 
548
        # this is three round trips, but we don't use this 
 
549
        # api more than once per write_group at the moment so 
671
550
        # it is a tolerable overhead. Better would be to truncate
672
551
        # the file after opening. RBC 20070805
673
552
        self.put_bytes_non_atomic(relpath, "", mode)
696
575
        :param failure_exc: Paramiko has the super fun ability to raise completely
697
576
                           opaque errors that just set "e.args = ('Failure',)" with
698
577
                           no more information.
699
 
                           If this parameter is set, it defines the exception
 
578
                           If this parameter is set, it defines the exception 
700
579
                           to raise in these cases.
701
580
        """
702
581
        # paramiko seems to generate detailless errors.
705
584
            if (e.args == ('No such file or directory',) or
706
585
                e.args == ('No such file',)):
707
586
                raise NoSuchFile(path, str(e) + more_info)
708
 
            if (e.args == ('mkdir failed',) or
709
 
                e.args[0].startswith('syserr: File exists')):
 
587
            if (e.args == ('mkdir failed',)):
710
588
                raise FileExists(path, str(e) + more_info)
711
589
            # strange but true, for the paramiko server.
712
590
            if (e.args == ('Failure',)):
713
591
                raise failure_exc(path, str(e) + more_info)
714
 
            # Can be something like args = ('Directory not empty:
715
 
            # '/srv/bazaar.launchpad.net/blah...: '
716
 
            # [Errno 39] Directory not empty',)
717
 
            if (e.args[0].startswith('Directory not empty: ')
718
 
                or getattr(e, 'errno', None) == errno.ENOTEMPTY):
719
 
                raise errors.DirectoryNotEmpty(path, str(e))
720
592
            mutter('Raising exception with args %s', e.args)
721
593
        if getattr(e, 'errno', None) is not None:
722
594
            mutter('Raising exception with errno %s', e.errno)
749
621
 
750
622
    def _rename_and_overwrite(self, abs_from, abs_to):
751
623
        """Do a fancy rename on the remote server.
752
 
 
 
624
        
753
625
        Using the implementation provided by osutils.
754
626
        """
755
627
        try:
774
646
            self._get_sftp().remove(path)
775
647
        except (IOError, paramiko.SSHException), e:
776
648
            self._translate_io_exception(e, path, ': unable to delete')
777
 
 
 
649
            
778
650
    def external_url(self):
779
651
        """See bzrlib.transport.Transport.external_url."""
780
652
        # the external path for SFTP is the base
795
667
        path = self._remote_path(relpath)
796
668
        try:
797
669
            entries = self._get_sftp().listdir(path)
798
 
            self._report_activity(sum(map(len, entries)), 'read')
799
670
        except (IOError, paramiko.SSHException), e:
800
671
            self._translate_io_exception(e, path, ': failed to list_dir')
801
672
        return [urlutils.escape(entry) for entry in entries]
858
729
        """
859
730
        # TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
860
731
        #       using the 'x' flag to indicate SFTP_FLAG_EXCL.
861
 
        #       However, there is no way to set the permission mode at open
 
732
        #       However, there is no way to set the permission mode at open 
862
733
        #       time using the sftp_client.file() functionality.
863
734
        path = self._get_sftp()._adjust_cwd(abspath)
864
735
        # mutter('sftp abspath %s => %s', abspath, path)
865
736
        attr = SFTPAttributes()
866
737
        if mode is not None:
867
738
            attr.st_mode = mode
868
 
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE
 
739
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE 
869
740
                | SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
870
741
        try:
871
742
            t, msg = self._get_sftp()._request(CMD_OPEN, path, omode, attr)
917
788
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
918
789
        self._socket.bind(('localhost', 0))
919
790
        self._socket.listen(1)
920
 
        self.host, self.port = self._socket.getsockname()[:2]
 
791
        self.port = self._socket.getsockname()[1]
921
792
        self._stop_event = threading.Event()
922
793
 
923
794
    def stop(self):
949
820
                # probably a failed test; unit test thread will log the
950
821
                # failure/error
951
822
                sys.excepthook(*sys.exc_info())
952
 
                warning('Exception from within unit test server thread: %r' %
 
823
                warning('Exception from within unit test server thread: %r' % 
953
824
                        x)
954
825
 
955
826
 
966
837
 
967
838
    Not all methods are implemented, this is deliberate as this class is not a
968
839
    replacement for the builtin sockets layer. fileno is not implemented to
969
 
    prevent the proxy being bypassed.
 
840
    prevent the proxy being bypassed. 
970
841
    """
971
842
 
972
843
    simulated_time = 0
974
845
        "close", "getpeername", "getsockname", "getsockopt", "gettimeout",
975
846
        "setblocking", "setsockopt", "settimeout", "shutdown"])
976
847
 
977
 
    def __init__(self, sock, latency, bandwidth=1.0,
 
848
    def __init__(self, sock, latency, bandwidth=1.0, 
978
849
                 really_sleep=True):
979
 
        """
 
850
        """ 
980
851
        :param bandwith: simulated bandwith (MegaBit)
981
852
        :param really_sleep: If set to false, the SocketDelay will just
982
853
        increase a counter, instead of calling time.sleep. This is useful for
985
856
        self.sock = sock
986
857
        self.latency = latency
987
858
        self.really_sleep = really_sleep
988
 
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024)
 
859
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024) 
989
860
        self.new_roundtrip = False
990
861
 
991
862
    def sleep(self, s):
1045
916
 
1046
917
    def _get_sftp_url(self, path):
1047
918
        """Calculate an sftp url to this server for path."""
1048
 
        return 'sftp://foo:bar@%s:%d/%s' % (self._listener.host,
1049
 
                                            self._listener.port, path)
 
919
        return 'sftp://foo:bar@localhost:%d/%s' % (self._listener.port, path)
1050
920
 
1051
921
    def log(self, message):
1052
922
        """StubServer uses this to log when a new server is created."""
1054
924
 
1055
925
    def _run_server_entry(self, sock):
1056
926
        """Entry point for all implementations of _run_server.
1057
 
 
 
927
        
1058
928
        If self.add_latency is > 0.000001 then sock is given a latency adding
1059
929
        decorator.
1060
930
        """
1077
947
        event = threading.Event()
1078
948
        ssh_server.start_server(event, server)
1079
949
        event.wait(5.0)
1080
 
 
1081
 
    def start_server(self, backing_server=None):
 
950
    
 
951
    def setUp(self, backing_server=None):
1082
952
        # XXX: TODO: make sftpserver back onto backing_server rather than local
1083
953
        # disk.
1084
954
        if not (backing_server is None or
1103
973
        self._listener.setDaemon(True)
1104
974
        self._listener.start()
1105
975
 
1106
 
    def stop_server(self):
 
976
    def tearDown(self):
 
977
        """See bzrlib.transport.Server.tearDown."""
1107
978
        self._listener.stop()
1108
979
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._original_vendor
1109
980
 
1202
1073
    It does this by serving from a deeply-nested directory that doesn't exist.
1203
1074
    """
1204
1075
 
1205
 
    def start_server(self, backing_server=None):
 
1076
    def setUp(self, backing_server=None):
1206
1077
        self._server_homedir = '/dev/noone/runs/tests/here'
1207
 
        super(SFTPSiblingAbsoluteServer, self).start_server(backing_server)
 
1078
        super(SFTPSiblingAbsoluteServer, self).setUp(backing_server)
1208
1079
 
1209
1080
 
1210
1081
def get_test_permutations():