~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

  • Committer: John Ferlito
  • Date: 2009-09-02 04:31:45 UTC
  • mto: (4665.7.1 serve-init)
  • mto: This revision was merged to the branch mainline in revision 4913.
  • Revision ID: johnf@inodes.org-20090902043145-gxdsfw03ilcwbyn5
Add a debian init script for bzr --serve

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
2
 
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
3
2
#
4
3
# This program is free software; you can redistribute it and/or modify
5
4
# it under the terms of the GNU General Public License as published by
13
12
#
14
13
# You should have received a copy of the GNU General Public License
15
14
# along with this program; if not, write to the Free Software
16
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
16
 
18
17
"""Implementation of Transport over SFTP, using paramiko."""
19
18
 
24
23
# suite.  Those formats all date back to 0.7; so we should be able to remove
25
24
# these methods when we officially drop support for those formats.
26
25
 
 
26
import bisect
27
27
import errno
 
28
import itertools
28
29
import os
29
30
import random
30
31
import select
37
38
import warnings
38
39
 
39
40
from bzrlib import (
 
41
    config,
 
42
    debug,
40
43
    errors,
41
44
    urlutils,
42
45
    )
93
96
 
94
97
class SFTPLock(object):
95
98
    """This fakes a lock in a remote location.
96
 
    
 
99
 
97
100
    A present lock is indicated just by the existence of a file.  This
98
 
    doesn't work well on all transports and they are only used in 
 
101
    doesn't work well on all transports and they are only used in
99
102
    deprecated storage formats.
100
103
    """
101
 
    
 
104
 
102
105
    __slots__ = ['path', 'lock_path', 'lock_file', 'transport']
103
106
 
104
107
    def __init__(self, path, transport):
131
134
            pass
132
135
 
133
136
 
 
137
class _SFTPReadvHelper(object):
 
138
    """A class to help with managing the state of a readv request."""
 
139
 
 
140
    # See _get_requests for an explanation.
 
141
    _max_request_size = 32768
 
142
 
 
143
    def __init__(self, original_offsets, relpath, _report_activity):
 
144
        """Create a new readv helper.
 
145
 
 
146
        :param original_offsets: The original requests given by the caller of
 
147
            readv()
 
148
        :param relpath: The name of the file (if known)
 
149
        :param _report_activity: A Transport._report_activity bound method,
 
150
            to be called as data arrives.
 
151
        """
 
152
        self.original_offsets = list(original_offsets)
 
153
        self.relpath = relpath
 
154
        self._report_activity = _report_activity
 
155
 
 
156
    def _get_requests(self):
 
157
        """Break up the offsets into individual requests over sftp.
 
158
 
 
159
        The SFTP spec only requires implementers to support 32kB requests. We
 
160
        could try something larger (openssh supports 64kB), but then we have to
 
161
        handle requests that fail.
 
162
        So instead, we just break up our maximum chunks into 32kB chunks, and
 
163
        asyncronously requests them.
 
164
        Newer versions of paramiko would do the chunking for us, but we want to
 
165
        start processing results right away, so we do it ourselves.
 
166
        """
 
167
        # TODO: Because we issue async requests, we don't 'fudge' any extra
 
168
        #       data.  I'm not 100% sure that is the best choice.
 
169
 
 
170
        # The first thing we do, is to collapse the individual requests as much
 
171
        # as possible, so we don't issues requests <32kB
 
172
        sorted_offsets = sorted(self.original_offsets)
 
173
        coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
 
174
                                                        limit=0, fudge_factor=0))
 
175
        requests = []
 
176
        for c_offset in coalesced:
 
177
            start = c_offset.start
 
178
            size = c_offset.length
 
179
 
 
180
            # Break this up into 32kB requests
 
181
            while size > 0:
 
182
                next_size = min(size, self._max_request_size)
 
183
                requests.append((start, next_size))
 
184
                size -= next_size
 
185
                start += next_size
 
186
        if 'sftp' in debug.debug_flags:
 
187
            mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
 
188
                self.relpath, len(sorted_offsets), len(coalesced),
 
189
                len(requests))
 
190
        return requests
 
191
 
 
192
    def request_and_yield_offsets(self, fp):
 
193
        """Request the data from the remote machine, yielding the results.
 
194
 
 
195
        :param fp: A Paramiko SFTPFile object that supports readv.
 
196
        :return: Yield the data requested by the original readv caller, one by
 
197
            one.
 
198
        """
 
199
        requests = self._get_requests()
 
200
        offset_iter = iter(self.original_offsets)
 
201
        cur_offset, cur_size = offset_iter.next()
 
202
        # paramiko .readv() yields strings that are in the order of the requests
 
203
        # So we track the current request to know where the next data is
 
204
        # being returned from.
 
205
        input_start = None
 
206
        last_end = None
 
207
        buffered_data = []
 
208
        buffered_len = 0
 
209
 
 
210
        # This is used to buffer chunks which we couldn't process yet
 
211
        # It is (start, end, data) tuples.
 
212
        data_chunks = []
 
213
        # Create an 'unlimited' data stream, so we stop based on requests,
 
214
        # rather than just because the data stream ended. This lets us detect
 
215
        # short readv.
 
216
        data_stream = itertools.chain(fp.readv(requests),
 
217
                                      itertools.repeat(None))
 
218
        for (start, length), data in itertools.izip(requests, data_stream):
 
219
            if data is None:
 
220
                if cur_coalesced is not None:
 
221
                    raise errors.ShortReadvError(self.relpath,
 
222
                        start, length, len(data))
 
223
            if len(data) != length:
 
224
                raise errors.ShortReadvError(self.relpath,
 
225
                    start, length, len(data))
 
226
            self._report_activity(length, 'read')
 
227
            if last_end is None:
 
228
                # This is the first request, just buffer it
 
229
                buffered_data = [data]
 
230
                buffered_len = length
 
231
                input_start = start
 
232
            elif start == last_end:
 
233
                # The data we are reading fits neatly on the previous
 
234
                # buffer, so this is all part of a larger coalesced range.
 
235
                buffered_data.append(data)
 
236
                buffered_len += length
 
237
            else:
 
238
                # We have an 'interrupt' in the data stream. So we know we are
 
239
                # at a request boundary.
 
240
                if buffered_len > 0:
 
241
                    # We haven't consumed the buffer so far, so put it into
 
242
                    # data_chunks, and continue.
 
243
                    buffered = ''.join(buffered_data)
 
244
                    data_chunks.append((input_start, buffered))
 
245
                input_start = start
 
246
                buffered_data = [data]
 
247
                buffered_len = length
 
248
            last_end = start + length
 
249
            if input_start == cur_offset and cur_size <= buffered_len:
 
250
                # Simplify the next steps a bit by transforming buffered_data
 
251
                # into a single string. We also have the nice property that
 
252
                # when there is only one string ''.join([x]) == x, so there is
 
253
                # no data copying.
 
254
                buffered = ''.join(buffered_data)
 
255
                # Clean out buffered data so that we keep memory
 
256
                # consumption low
 
257
                del buffered_data[:]
 
258
                buffered_offset = 0
 
259
                # TODO: We *could* also consider the case where cur_offset is in
 
260
                #       in the buffered range, even though it doesn't *start*
 
261
                #       the buffered range. But for packs we pretty much always
 
262
                #       read in order, so you won't get any extra data in the
 
263
                #       middle.
 
264
                while (input_start == cur_offset
 
265
                       and (buffered_offset + cur_size) <= buffered_len):
 
266
                    # We've buffered enough data to process this request, spit it
 
267
                    # out
 
268
                    cur_data = buffered[buffered_offset:buffered_offset + cur_size]
 
269
                    # move the direct pointer into our buffered data
 
270
                    buffered_offset += cur_size
 
271
                    # Move the start-of-buffer pointer
 
272
                    input_start += cur_size
 
273
                    # Yield the requested data
 
274
                    yield cur_offset, cur_data
 
275
                    cur_offset, cur_size = offset_iter.next()
 
276
                # at this point, we've consumed as much of buffered as we can,
 
277
                # so break off the portion that we consumed
 
278
                if buffered_offset == len(buffered_data):
 
279
                    # No tail to leave behind
 
280
                    buffered_data = []
 
281
                    buffered_len = 0
 
282
                else:
 
283
                    buffered = buffered[buffered_offset:]
 
284
                    buffered_data = [buffered]
 
285
                    buffered_len = len(buffered)
 
286
        if buffered_len:
 
287
            buffered = ''.join(buffered_data)
 
288
            del buffered_data[:]
 
289
            data_chunks.append((input_start, buffered))
 
290
        if data_chunks:
 
291
            if 'sftp' in debug.debug_flags:
 
292
                mutter('SFTP readv left with %d out-of-order bytes',
 
293
                    sum(map(lambda x: len(x[1]), data_chunks)))
 
294
            # We've processed all the readv data, at this point, anything we
 
295
            # couldn't process is in data_chunks. This doesn't happen often, so
 
296
            # this code path isn't optimized
 
297
            # We use an interesting process for data_chunks
 
298
            # Specifically if we have "bisect_left([(start, len, entries)],
 
299
            #                                       (qstart,)])
 
300
            # If start == qstart, then we get the specific node. Otherwise we
 
301
            # get the previous node
 
302
            while True:
 
303
                idx = bisect.bisect_left(data_chunks, (cur_offset,))
 
304
                if idx < len(data_chunks) and data_chunks[idx][0] == cur_offset:
 
305
                    # The data starts here
 
306
                    data = data_chunks[idx][1][:cur_size]
 
307
                elif idx > 0:
 
308
                    # The data is in a portion of a previous page
 
309
                    idx -= 1
 
310
                    sub_offset = cur_offset - data_chunks[idx][0]
 
311
                    data = data_chunks[idx][1]
 
312
                    data = data[sub_offset:sub_offset + cur_size]
 
313
                else:
 
314
                    # We are missing the page where the data should be found,
 
315
                    # something is wrong
 
316
                    data = ''
 
317
                if len(data) != cur_size:
 
318
                    raise AssertionError('We must have miscalulated.'
 
319
                        ' We expected %d bytes, but only found %d'
 
320
                        % (cur_size, len(data)))
 
321
                yield cur_offset, data
 
322
                cur_offset, cur_size = offset_iter.next()
 
323
 
 
324
 
134
325
class SFTPTransport(ConnectedTransport):
135
326
    """Transport implementation for SFTP access."""
136
327
 
158
349
 
159
350
    def _remote_path(self, relpath):
160
351
        """Return the path to be passed along the sftp protocol for relpath.
161
 
        
 
352
 
162
353
        :param relpath: is a urlencoded string.
163
354
        """
164
355
        relative = urlutils.unescape(relpath).encode('utf-8')
192
383
            password = credentials
193
384
 
194
385
        vendor = ssh._get_ssh_vendor()
 
386
        user = self._user
 
387
        if user is None:
 
388
            auth = config.AuthenticationConfig()
 
389
            user = auth.get_user('ssh', self._host, self._port)
195
390
        connection = vendor.connect_sftp(self._user, password,
196
391
                                         self._host, self._port)
197
 
        return connection, password
 
392
        return connection, (user, password)
198
393
 
199
394
    def _get_sftp(self):
200
395
        """Ensures that a connection is established"""
211
406
        """
212
407
        try:
213
408
            self._get_sftp().stat(self._remote_path(relpath))
 
409
            # stat result is about 20 bytes, let's say
 
410
            self._report_activity(20, 'read')
214
411
            return True
215
412
        except IOError:
216
413
            return False
217
414
 
218
415
    def get(self, relpath):
219
 
        """
220
 
        Get the file at the given relative path.
 
416
        """Get the file at the given relative path.
221
417
 
222
418
        :param relpath: The relative path to the file
223
419
        """
224
420
        try:
 
421
            # FIXME: by returning the file directly, we don't pass this
 
422
            # through to report_activity.  We could try wrapping the object
 
423
            # before it's returned.  For readv and get_bytes it's handled in
 
424
            # the higher-level function.
 
425
            # -- mbp 20090126
225
426
            path = self._remote_path(relpath)
226
427
            f = self._get_sftp().file(path, mode='rb')
227
428
            if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
231
432
            self._translate_io_exception(e, path, ': error retrieving',
232
433
                failure_exc=errors.ReadError)
233
434
 
 
435
    def get_bytes(self, relpath):
 
436
        # reimplement this here so that we can report how many bytes came back
 
437
        f = self.get(relpath)
 
438
        try:
 
439
            bytes = f.read()
 
440
            self._report_activity(len(bytes), 'read')
 
441
            return bytes
 
442
        finally:
 
443
            f.close()
 
444
 
234
445
    def _readv(self, relpath, offsets):
235
446
        """See Transport.readv()"""
236
447
        # We overload the default readv() because we want to use a file
245
456
            readv = getattr(fp, 'readv', None)
246
457
            if readv:
247
458
                return self._sftp_readv(fp, offsets, relpath)
248
 
            mutter('seek and read %s offsets', len(offsets))
 
459
            if 'sftp' in debug.debug_flags:
 
460
                mutter('seek and read %s offsets', len(offsets))
249
461
            return self._seek_and_read(fp, offsets, relpath)
250
462
        except (IOError, paramiko.SSHException), e:
251
463
            self._translate_io_exception(e, path, ': error retrieving')
258
470
        """
259
471
        return 64 * 1024
260
472
 
261
 
    def _sftp_readv(self, fp, offsets, relpath='<unknown>'):
 
473
    def _sftp_readv(self, fp, offsets, relpath):
262
474
        """Use the readv() member of fp to do async readv.
263
475
 
264
 
        And then read them using paramiko.readv(). paramiko.readv()
 
476
        Then read them using paramiko.readv(). paramiko.readv()
265
477
        does not support ranges > 64K, so it caps the request size, and
266
 
        just reads until it gets all the stuff it wants
 
478
        just reads until it gets all the stuff it wants.
267
479
        """
268
 
        offsets = list(offsets)
269
 
        sorted_offsets = sorted(offsets)
270
 
 
271
 
        # The algorithm works as follows:
272
 
        # 1) Coalesce nearby reads into a single chunk
273
 
        #    This generates a list of combined regions, the total size
274
 
        #    and the size of the sub regions. This coalescing step is limited
275
 
        #    in the number of nearby chunks to combine, and is allowed to
276
 
        #    skip small breaks in the requests. Limiting it makes sure that
277
 
        #    we can start yielding some data earlier, and skipping means we
278
 
        #    make fewer requests. (Beneficial even when using async)
279
 
        # 2) Break up this combined regions into chunks that are smaller
280
 
        #    than 64KiB. Technically the limit is 65536, but we are a
281
 
        #    little bit conservative. This is because sftp has a maximum
282
 
        #    return chunk size of 64KiB (max size of an unsigned short)
283
 
        # 3) Issue a readv() to paramiko to create an async request for
284
 
        #    all of this data
285
 
        # 4) Read in the data as it comes back, until we've read one
286
 
        #    continuous section as determined in step 1
287
 
        # 5) Break up the full sections into hunks for the original requested
288
 
        #    offsets. And put them in a cache
289
 
        # 6) Check if the next request is in the cache, and if it is, remove
290
 
        #    it from the cache, and yield its data. Continue until no more
291
 
        #    entries are in the cache.
292
 
        # 7) loop back to step 4 until all data has been read
293
 
        #
294
 
        # TODO: jam 20060725 This could be optimized one step further, by
295
 
        #       attempting to yield whatever data we have read, even before
296
 
        #       the first coallesced section has been fully processed.
297
 
 
298
 
        # When coalescing for use with readv(), we don't really need to
299
 
        # use any fudge factor, because the requests are made asynchronously
300
 
        coalesced = list(self._coalesce_offsets(sorted_offsets,
301
 
                               limit=self._max_readv_combine,
302
 
                               fudge_factor=0,
303
 
                               ))
304
 
        requests = []
305
 
        for c_offset in coalesced:
306
 
            start = c_offset.start
307
 
            size = c_offset.length
308
 
 
309
 
            # We need to break this up into multiple requests
310
 
            while size > 0:
311
 
                next_size = min(size, self._max_request_size)
312
 
                requests.append((start, next_size))
313
 
                size -= next_size
314
 
                start += next_size
315
 
 
316
 
        mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
317
 
                len(offsets), len(coalesced), len(requests))
318
 
 
319
 
        # Queue the current read until we have read the full coalesced section
320
 
        cur_data = []
321
 
        cur_data_len = 0
322
 
        cur_coalesced_stack = iter(coalesced)
323
 
        cur_coalesced = cur_coalesced_stack.next()
324
 
 
325
 
        # Cache the results, but only until they have been fulfilled
326
 
        data_map = {}
327
 
        # turn the list of offsets into a stack
328
 
        offset_stack = iter(offsets)
329
 
        cur_offset_and_size = offset_stack.next()
330
 
 
331
 
        for data in fp.readv(requests):
332
 
            cur_data += data
333
 
            cur_data_len += len(data)
334
 
 
335
 
            if cur_data_len < cur_coalesced.length:
336
 
                continue
337
 
            if cur_data_len != cur_coalesced.length:
338
 
                raise AssertionError(
339
 
                    "Somehow we read too much: %s != %s" 
340
 
                    % (cur_data_len, cur_coalesced.length))
341
 
            all_data = ''.join(cur_data)
342
 
            cur_data = []
343
 
            cur_data_len = 0
344
 
 
345
 
            for suboffset, subsize in cur_coalesced.ranges:
346
 
                key = (cur_coalesced.start+suboffset, subsize)
347
 
                data_map[key] = all_data[suboffset:suboffset+subsize]
348
 
 
349
 
            # Now that we've read some data, see if we can yield anything back
350
 
            while cur_offset_and_size in data_map:
351
 
                this_data = data_map.pop(cur_offset_and_size)
352
 
                yield cur_offset_and_size[0], this_data
353
 
                cur_offset_and_size = offset_stack.next()
354
 
 
355
 
            # We read a coalesced entry, so mark it as done
356
 
            cur_coalesced = None
357
 
            # Now that we've read all of the data for this coalesced section
358
 
            # on to the next
359
 
            cur_coalesced = cur_coalesced_stack.next()
360
 
 
361
 
        if cur_coalesced is not None:
362
 
            raise errors.ShortReadvError(relpath, cur_coalesced.start,
363
 
                cur_coalesced.length, len(data))
 
480
        helper = _SFTPReadvHelper(offsets, relpath, self._report_activity)
 
481
        return helper.request_and_yield_offsets(fp)
364
482
 
365
483
    def put_file(self, relpath, f, mode=None):
366
484
        """
391
509
            #      sticky bit. So it is probably best to stop chmodding, and
392
510
            #      just tell users that they need to set the umask correctly.
393
511
            #      The attr.st_mode = mode, in _sftp_open_exclusive
394
 
            #      will handle when the user wants the final mode to be more 
395
 
            #      restrictive. And then we avoid a round trip. Unless 
 
512
            #      will handle when the user wants the final mode to be more
 
513
            #      restrictive. And then we avoid a round trip. Unless
396
514
            #      paramiko decides to expose an async chmod()
397
515
 
398
516
            # This is designed to chmod() right before we close.
399
 
            # Because we set_pipelined() earlier, theoretically we might 
 
517
            # Because we set_pipelined() earlier, theoretically we might
400
518
            # avoid the round trip for fout.close()
401
519
            if mode is not None:
402
520
                self._get_sftp().chmod(tmp_abspath, mode)
444
562
                                                 ': unable to open')
445
563
 
446
564
                # This is designed to chmod() right before we close.
447
 
                # Because we set_pipelined() earlier, theoretically we might 
 
565
                # Because we set_pipelined() earlier, theoretically we might
448
566
                # avoid the round trip for fout.close()
449
567
                if mode is not None:
450
568
                    self._get_sftp().chmod(abspath, mode)
501
619
 
502
620
    def iter_files_recursive(self):
503
621
        """Walk the relative paths of all files in this transport."""
 
622
        # progress is handled by list_dir
504
623
        queue = list(self.list_dir('.'))
505
624
        while queue:
506
625
            relpath = queue.pop(0)
517
636
        else:
518
637
            local_mode = mode
519
638
        try:
 
639
            self._report_activity(len(abspath), 'write')
520
640
            self._get_sftp().mkdir(abspath, local_mode)
 
641
            self._report_activity(1, 'read')
521
642
            if mode is not None:
522
643
                # chmod a dir through sftp will erase any sgid bit set
523
644
                # on the server side.  So, if the bit mode are already
545
666
    def open_write_stream(self, relpath, mode=None):
546
667
        """See Transport.open_write_stream."""
547
668
        # initialise the file to zero-length
548
 
        # this is three round trips, but we don't use this 
549
 
        # api more than once per write_group at the moment so 
 
669
        # this is three round trips, but we don't use this
 
670
        # api more than once per write_group at the moment so
550
671
        # it is a tolerable overhead. Better would be to truncate
551
672
        # the file after opening. RBC 20070805
552
673
        self.put_bytes_non_atomic(relpath, "", mode)
575
696
        :param failure_exc: Paramiko has the super fun ability to raise completely
576
697
                           opaque errors that just set "e.args = ('Failure',)" with
577
698
                           no more information.
578
 
                           If this parameter is set, it defines the exception 
 
699
                           If this parameter is set, it defines the exception
579
700
                           to raise in these cases.
580
701
        """
581
702
        # paramiko seems to generate detailless errors.
590
711
            # strange but true, for the paramiko server.
591
712
            if (e.args == ('Failure',)):
592
713
                raise failure_exc(path, str(e) + more_info)
 
714
            # Can be something like args = ('Directory not empty:
 
715
            # '/srv/bazaar.launchpad.net/blah...: '
 
716
            # [Errno 39] Directory not empty',)
 
717
            if (e.args[0].startswith('Directory not empty: ')
 
718
                or getattr(e, 'errno', None) == errno.ENOTEMPTY):
 
719
                raise errors.DirectoryNotEmpty(path, str(e))
593
720
            mutter('Raising exception with args %s', e.args)
594
721
        if getattr(e, 'errno', None) is not None:
595
722
            mutter('Raising exception with errno %s', e.errno)
622
749
 
623
750
    def _rename_and_overwrite(self, abs_from, abs_to):
624
751
        """Do a fancy rename on the remote server.
625
 
        
 
752
 
626
753
        Using the implementation provided by osutils.
627
754
        """
628
755
        try:
647
774
            self._get_sftp().remove(path)
648
775
        except (IOError, paramiko.SSHException), e:
649
776
            self._translate_io_exception(e, path, ': unable to delete')
650
 
            
 
777
 
651
778
    def external_url(self):
652
779
        """See bzrlib.transport.Transport.external_url."""
653
780
        # the external path for SFTP is the base
668
795
        path = self._remote_path(relpath)
669
796
        try:
670
797
            entries = self._get_sftp().listdir(path)
 
798
            self._report_activity(sum(map(len, entries)), 'read')
671
799
        except (IOError, paramiko.SSHException), e:
672
800
            self._translate_io_exception(e, path, ': failed to list_dir')
673
801
        return [urlutils.escape(entry) for entry in entries]
730
858
        """
731
859
        # TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
732
860
        #       using the 'x' flag to indicate SFTP_FLAG_EXCL.
733
 
        #       However, there is no way to set the permission mode at open 
 
861
        #       However, there is no way to set the permission mode at open
734
862
        #       time using the sftp_client.file() functionality.
735
863
        path = self._get_sftp()._adjust_cwd(abspath)
736
864
        # mutter('sftp abspath %s => %s', abspath, path)
737
865
        attr = SFTPAttributes()
738
866
        if mode is not None:
739
867
            attr.st_mode = mode
740
 
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE 
 
868
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE
741
869
                | SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
742
870
        try:
743
871
            t, msg = self._get_sftp()._request(CMD_OPEN, path, omode, attr)
821
949
                # probably a failed test; unit test thread will log the
822
950
                # failure/error
823
951
                sys.excepthook(*sys.exc_info())
824
 
                warning('Exception from within unit test server thread: %r' % 
 
952
                warning('Exception from within unit test server thread: %r' %
825
953
                        x)
826
954
 
827
955
 
838
966
 
839
967
    Not all methods are implemented, this is deliberate as this class is not a
840
968
    replacement for the builtin sockets layer. fileno is not implemented to
841
 
    prevent the proxy being bypassed. 
 
969
    prevent the proxy being bypassed.
842
970
    """
843
971
 
844
972
    simulated_time = 0
846
974
        "close", "getpeername", "getsockname", "getsockopt", "gettimeout",
847
975
        "setblocking", "setsockopt", "settimeout", "shutdown"])
848
976
 
849
 
    def __init__(self, sock, latency, bandwidth=1.0, 
 
977
    def __init__(self, sock, latency, bandwidth=1.0,
850
978
                 really_sleep=True):
851
 
        """ 
 
979
        """
852
980
        :param bandwith: simulated bandwith (MegaBit)
853
981
        :param really_sleep: If set to false, the SocketDelay will just
854
982
        increase a counter, instead of calling time.sleep. This is useful for
857
985
        self.sock = sock
858
986
        self.latency = latency
859
987
        self.really_sleep = really_sleep
860
 
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024) 
 
988
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024)
861
989
        self.new_roundtrip = False
862
990
 
863
991
    def sleep(self, s):
925
1053
 
926
1054
    def _run_server_entry(self, sock):
927
1055
        """Entry point for all implementations of _run_server.
928
 
        
 
1056
 
929
1057
        If self.add_latency is > 0.000001 then sock is given a latency adding
930
1058
        decorator.
931
1059
        """
948
1076
        event = threading.Event()
949
1077
        ssh_server.start_server(event, server)
950
1078
        event.wait(5.0)
951
 
    
 
1079
 
952
1080
    def setUp(self, backing_server=None):
953
1081
        # XXX: TODO: make sftpserver back onto backing_server rather than local
954
1082
        # disk.