~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

  • Committer: Patch Queue Manager
  • Date: 2016-02-01 19:13:13 UTC
  • mfrom: (6614.2.2 trunk)
  • Revision ID: pqm@pqm.ubuntu.com-20160201191313-wdfvmfff1djde6oq
(vila) Release 2.7.0 (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
2
 
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2016 Canonical Ltd
3
2
#
4
3
# This program is free software; you can redistribute it and/or modify
5
4
# it under the terms of the GNU General Public License as published by
13
12
#
14
13
# You should have received a copy of the GNU General Public License
15
14
# along with this program; if not, write to the Free Software
16
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
16
 
18
17
"""Implementation of Transport over SFTP, using paramiko."""
19
18
 
 
19
from __future__ import absolute_import
 
20
 
20
21
# TODO: Remove the transport-based lock_read and lock_write methods.  They'll
21
22
# then raise TransportNotPossible, which will break remote access to any
22
23
# formats which rely on OS-level locks.  That should be fine as those formats
24
25
# suite.  Those formats all date back to 0.7; so we should be able to remove
25
26
# these methods when we officially drop support for those formats.
26
27
 
 
28
import bisect
27
29
import errno
 
30
import itertools
28
31
import os
29
32
import random
30
 
import select
31
 
import socket
32
33
import stat
33
34
import sys
34
35
import time
35
 
import urllib
36
 
import urlparse
37
36
import warnings
38
37
 
39
38
from bzrlib import (
 
39
    config,
 
40
    debug,
40
41
    errors,
41
42
    urlutils,
42
43
    )
43
44
from bzrlib.errors import (FileExists,
44
 
                           NoSuchFile, PathNotChild,
 
45
                           NoSuchFile,
45
46
                           TransportError,
46
47
                           LockError,
47
48
                           PathError,
48
49
                           ParamikoNotPresent,
49
50
                           )
50
 
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
51
 
from bzrlib.symbol_versioning import (
52
 
        deprecated_function,
53
 
        zero_ninety,
54
 
        )
 
51
from bzrlib.osutils import fancy_rename
55
52
from bzrlib.trace import mutter, warning
56
53
from bzrlib.transport import (
57
54
    FileFileStream,
58
55
    _file_streams,
59
 
    local,
60
 
    register_urlparse_netloc_protocol,
61
 
    Server,
62
56
    ssh,
63
57
    ConnectedTransport,
64
58
    )
83
77
else:
84
78
    from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
85
79
                               SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
86
 
                               CMD_HANDLE, CMD_OPEN)
 
80
                               SFTP_OK, CMD_HANDLE, CMD_OPEN)
87
81
    from paramiko.sftp_attr import SFTPAttributes
88
82
    from paramiko.sftp_file import SFTPFile
89
83
 
90
84
 
91
 
register_urlparse_netloc_protocol('sftp')
92
 
 
93
 
 
94
85
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
95
86
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
96
87
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
97
88
 
98
89
 
99
 
@deprecated_function(zero_ninety)
100
 
def clear_connection_cache():
101
 
    """Remove all hosts from the SFTP connection cache.
102
 
 
103
 
    Primarily useful for test cases wanting to force garbage collection.
104
 
    We don't have a global connection cache anymore.
105
 
    """
106
 
 
107
90
class SFTPLock(object):
108
91
    """This fakes a lock in a remote location.
109
 
    
 
92
 
110
93
    A present lock is indicated just by the existence of a file.  This
111
 
    doesn't work well on all transports and they are only used in 
 
94
    doesn't work well on all transports and they are only used in
112
95
    deprecated storage formats.
113
96
    """
114
 
    
 
97
 
115
98
    __slots__ = ['path', 'lock_path', 'lock_file', 'transport']
116
99
 
117
100
    def __init__(self, path, transport):
118
 
        assert isinstance(transport, SFTPTransport)
119
 
 
120
101
        self.lock_file = None
121
102
        self.path = path
122
103
        self.lock_path = path + '.write-lock'
128
109
        except FileExists:
129
110
            raise LockError('File %r already locked' % (self.path,))
130
111
 
131
 
    def __del__(self):
132
 
        """Should this warn, or actually try to cleanup?"""
133
 
        if self.lock_file:
134
 
            warning("SFTPLock %r not explicitly unlocked" % (self.path,))
135
 
            self.unlock()
136
 
 
137
112
    def unlock(self):
138
113
        if not self.lock_file:
139
114
            return
146
121
            pass
147
122
 
148
123
 
 
124
class _SFTPReadvHelper(object):
 
125
    """A class to help with managing the state of a readv request."""
 
126
 
 
127
    # See _get_requests for an explanation.
 
128
    _max_request_size = 32768
 
129
 
 
130
    def __init__(self, original_offsets, relpath, _report_activity):
 
131
        """Create a new readv helper.
 
132
 
 
133
        :param original_offsets: The original requests given by the caller of
 
134
            readv()
 
135
        :param relpath: The name of the file (if known)
 
136
        :param _report_activity: A Transport._report_activity bound method,
 
137
            to be called as data arrives.
 
138
        """
 
139
        self.original_offsets = list(original_offsets)
 
140
        self.relpath = relpath
 
141
        self._report_activity = _report_activity
 
142
 
 
143
    def _get_requests(self):
 
144
        """Break up the offsets into individual requests over sftp.
 
145
 
 
146
        The SFTP spec only requires implementers to support 32kB requests. We
 
147
        could try something larger (openssh supports 64kB), but then we have to
 
148
        handle requests that fail.
 
149
        So instead, we just break up our maximum chunks into 32kB chunks, and
 
150
        asyncronously requests them.
 
151
        Newer versions of paramiko would do the chunking for us, but we want to
 
152
        start processing results right away, so we do it ourselves.
 
153
        """
 
154
        # TODO: Because we issue async requests, we don't 'fudge' any extra
 
155
        #       data.  I'm not 100% sure that is the best choice.
 
156
 
 
157
        # The first thing we do, is to collapse the individual requests as much
 
158
        # as possible, so we don't issues requests <32kB
 
159
        sorted_offsets = sorted(self.original_offsets)
 
160
        coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
 
161
                                                        limit=0, fudge_factor=0))
 
162
        requests = []
 
163
        for c_offset in coalesced:
 
164
            start = c_offset.start
 
165
            size = c_offset.length
 
166
 
 
167
            # Break this up into 32kB requests
 
168
            while size > 0:
 
169
                next_size = min(size, self._max_request_size)
 
170
                requests.append((start, next_size))
 
171
                size -= next_size
 
172
                start += next_size
 
173
        if 'sftp' in debug.debug_flags:
 
174
            mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
 
175
                self.relpath, len(sorted_offsets), len(coalesced),
 
176
                len(requests))
 
177
        return requests
 
178
 
 
179
    def request_and_yield_offsets(self, fp):
 
180
        """Request the data from the remote machine, yielding the results.
 
181
 
 
182
        :param fp: A Paramiko SFTPFile object that supports readv.
 
183
        :return: Yield the data requested by the original readv caller, one by
 
184
            one.
 
185
        """
 
186
        requests = self._get_requests()
 
187
        offset_iter = iter(self.original_offsets)
 
188
        cur_offset, cur_size = offset_iter.next()
 
189
        # paramiko .readv() yields strings that are in the order of the requests
 
190
        # So we track the current request to know where the next data is
 
191
        # being returned from.
 
192
        input_start = None
 
193
        last_end = None
 
194
        buffered_data = []
 
195
        buffered_len = 0
 
196
 
 
197
        # This is used to buffer chunks which we couldn't process yet
 
198
        # It is (start, end, data) tuples.
 
199
        data_chunks = []
 
200
        # Create an 'unlimited' data stream, so we stop based on requests,
 
201
        # rather than just because the data stream ended. This lets us detect
 
202
        # short readv.
 
203
        data_stream = itertools.chain(fp.readv(requests),
 
204
                                      itertools.repeat(None))
 
205
        for (start, length), data in itertools.izip(requests, data_stream):
 
206
            if data is None:
 
207
                if cur_coalesced is not None:
 
208
                    raise errors.ShortReadvError(self.relpath,
 
209
                        start, length, len(data))
 
210
            if len(data) != length:
 
211
                raise errors.ShortReadvError(self.relpath,
 
212
                    start, length, len(data))
 
213
            self._report_activity(length, 'read')
 
214
            if last_end is None:
 
215
                # This is the first request, just buffer it
 
216
                buffered_data = [data]
 
217
                buffered_len = length
 
218
                input_start = start
 
219
            elif start == last_end:
 
220
                # The data we are reading fits neatly on the previous
 
221
                # buffer, so this is all part of a larger coalesced range.
 
222
                buffered_data.append(data)
 
223
                buffered_len += length
 
224
            else:
 
225
                # We have an 'interrupt' in the data stream. So we know we are
 
226
                # at a request boundary.
 
227
                if buffered_len > 0:
 
228
                    # We haven't consumed the buffer so far, so put it into
 
229
                    # data_chunks, and continue.
 
230
                    buffered = ''.join(buffered_data)
 
231
                    data_chunks.append((input_start, buffered))
 
232
                input_start = start
 
233
                buffered_data = [data]
 
234
                buffered_len = length
 
235
            last_end = start + length
 
236
            if input_start == cur_offset and cur_size <= buffered_len:
 
237
                # Simplify the next steps a bit by transforming buffered_data
 
238
                # into a single string. We also have the nice property that
 
239
                # when there is only one string ''.join([x]) == x, so there is
 
240
                # no data copying.
 
241
                buffered = ''.join(buffered_data)
 
242
                # Clean out buffered data so that we keep memory
 
243
                # consumption low
 
244
                del buffered_data[:]
 
245
                buffered_offset = 0
 
246
                # TODO: We *could* also consider the case where cur_offset is in
 
247
                #       in the buffered range, even though it doesn't *start*
 
248
                #       the buffered range. But for packs we pretty much always
 
249
                #       read in order, so you won't get any extra data in the
 
250
                #       middle.
 
251
                while (input_start == cur_offset
 
252
                       and (buffered_offset + cur_size) <= buffered_len):
 
253
                    # We've buffered enough data to process this request, spit it
 
254
                    # out
 
255
                    cur_data = buffered[buffered_offset:buffered_offset + cur_size]
 
256
                    # move the direct pointer into our buffered data
 
257
                    buffered_offset += cur_size
 
258
                    # Move the start-of-buffer pointer
 
259
                    input_start += cur_size
 
260
                    # Yield the requested data
 
261
                    yield cur_offset, cur_data
 
262
                    cur_offset, cur_size = offset_iter.next()
 
263
                # at this point, we've consumed as much of buffered as we can,
 
264
                # so break off the portion that we consumed
 
265
                if buffered_offset == len(buffered_data):
 
266
                    # No tail to leave behind
 
267
                    buffered_data = []
 
268
                    buffered_len = 0
 
269
                else:
 
270
                    buffered = buffered[buffered_offset:]
 
271
                    buffered_data = [buffered]
 
272
                    buffered_len = len(buffered)
 
273
        # now that the data stream is done, close the handle
 
274
        fp.close()
 
275
        if buffered_len:
 
276
            buffered = ''.join(buffered_data)
 
277
            del buffered_data[:]
 
278
            data_chunks.append((input_start, buffered))
 
279
        if data_chunks:
 
280
            if 'sftp' in debug.debug_flags:
 
281
                mutter('SFTP readv left with %d out-of-order bytes',
 
282
                    sum(map(lambda x: len(x[1]), data_chunks)))
 
283
            # We've processed all the readv data, at this point, anything we
 
284
            # couldn't process is in data_chunks. This doesn't happen often, so
 
285
            # this code path isn't optimized
 
286
            # We use an interesting process for data_chunks
 
287
            # Specifically if we have "bisect_left([(start, len, entries)],
 
288
            #                                       (qstart,)])
 
289
            # If start == qstart, then we get the specific node. Otherwise we
 
290
            # get the previous node
 
291
            while True:
 
292
                idx = bisect.bisect_left(data_chunks, (cur_offset,))
 
293
                if idx < len(data_chunks) and data_chunks[idx][0] == cur_offset:
 
294
                    # The data starts here
 
295
                    data = data_chunks[idx][1][:cur_size]
 
296
                elif idx > 0:
 
297
                    # The data is in a portion of a previous page
 
298
                    idx -= 1
 
299
                    sub_offset = cur_offset - data_chunks[idx][0]
 
300
                    data = data_chunks[idx][1]
 
301
                    data = data[sub_offset:sub_offset + cur_size]
 
302
                else:
 
303
                    # We are missing the page where the data should be found,
 
304
                    # something is wrong
 
305
                    data = ''
 
306
                if len(data) != cur_size:
 
307
                    raise AssertionError('We must have miscalulated.'
 
308
                        ' We expected %d bytes, but only found %d'
 
309
                        % (cur_size, len(data)))
 
310
                yield cur_offset, data
 
311
                cur_offset, cur_size = offset_iter.next()
 
312
 
 
313
 
149
314
class SFTPTransport(ConnectedTransport):
150
315
    """Transport implementation for SFTP access."""
151
316
 
167
332
    # up the request itself, rather than us having to worry about it
168
333
    _max_request_size = 32768
169
334
 
170
 
    def __init__(self, base, _from_transport=None):
171
 
        assert base.startswith('sftp://')
172
 
        super(SFTPTransport, self).__init__(base,
173
 
                                            _from_transport=_from_transport)
174
 
 
175
335
    def _remote_path(self, relpath):
176
336
        """Return the path to be passed along the sftp protocol for relpath.
177
 
        
 
337
 
178
338
        :param relpath: is a urlencoded string.
179
339
        """
180
 
        relative = urlutils.unescape(relpath).encode('utf-8')
181
 
        remote_path = self._combine_paths(self._path, relative)
 
340
        remote_path = self._parsed_url.clone(relpath).path
182
341
        # the initial slash should be removed from the path, and treated as a
183
342
        # homedir relative path (the path begins with a double slash if it is
184
343
        # absolute).  see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
203
362
        in base url at transport creation time.
204
363
        """
205
364
        if credentials is None:
206
 
            password = self._password
 
365
            password = self._parsed_url.password
207
366
        else:
208
367
            password = credentials
209
368
 
210
369
        vendor = ssh._get_ssh_vendor()
211
 
        connection = vendor.connect_sftp(self._user, password,
212
 
                                         self._host, self._port)
213
 
        return connection, password
 
370
        user = self._parsed_url.user
 
371
        if user is None:
 
372
            auth = config.AuthenticationConfig()
 
373
            user = auth.get_user('ssh', self._parsed_url.host,
 
374
                self._parsed_url.port)
 
375
        connection = vendor.connect_sftp(self._parsed_url.user, password,
 
376
            self._parsed_url.host, self._parsed_url.port)
 
377
        return connection, (user, password)
 
378
 
 
379
    def disconnect(self):
 
380
        connection = self._get_connection()
 
381
        if connection is not None:
 
382
            connection.close()
214
383
 
215
384
    def _get_sftp(self):
216
385
        """Ensures that a connection is established"""
227
396
        """
228
397
        try:
229
398
            self._get_sftp().stat(self._remote_path(relpath))
 
399
            # stat result is about 20 bytes, let's say
 
400
            self._report_activity(20, 'read')
230
401
            return True
231
402
        except IOError:
232
403
            return False
233
404
 
234
405
    def get(self, relpath):
235
 
        """
236
 
        Get the file at the given relative path.
 
406
        """Get the file at the given relative path.
237
407
 
238
408
        :param relpath: The relative path to the file
239
409
        """
247
417
            self._translate_io_exception(e, path, ': error retrieving',
248
418
                failure_exc=errors.ReadError)
249
419
 
250
 
    def readv(self, relpath, offsets):
 
420
    def get_bytes(self, relpath):
 
421
        # reimplement this here so that we can report how many bytes came back
 
422
        f = self.get(relpath)
 
423
        try:
 
424
            bytes = f.read()
 
425
            self._report_activity(len(bytes), 'read')
 
426
            return bytes
 
427
        finally:
 
428
            f.close()
 
429
 
 
430
    def _readv(self, relpath, offsets):
251
431
        """See Transport.readv()"""
252
432
        # We overload the default readv() because we want to use a file
253
433
        # that does not have prefetch enabled.
261
441
            readv = getattr(fp, 'readv', None)
262
442
            if readv:
263
443
                return self._sftp_readv(fp, offsets, relpath)
264
 
            mutter('seek and read %s offsets', len(offsets))
 
444
            if 'sftp' in debug.debug_flags:
 
445
                mutter('seek and read %s offsets', len(offsets))
265
446
            return self._seek_and_read(fp, offsets, relpath)
266
447
        except (IOError, paramiko.SSHException), e:
267
448
            self._translate_io_exception(e, path, ': error retrieving')
274
455
        """
275
456
        return 64 * 1024
276
457
 
277
 
    def _sftp_readv(self, fp, offsets, relpath='<unknown>'):
 
458
    def _sftp_readv(self, fp, offsets, relpath):
278
459
        """Use the readv() member of fp to do async readv.
279
460
 
280
 
        And then read them using paramiko.readv(). paramiko.readv()
 
461
        Then read them using paramiko.readv(). paramiko.readv()
281
462
        does not support ranges > 64K, so it caps the request size, and
282
 
        just reads until it gets all the stuff it wants
 
463
        just reads until it gets all the stuff it wants.
283
464
        """
284
 
        offsets = list(offsets)
285
 
        sorted_offsets = sorted(offsets)
286
 
 
287
 
        # The algorithm works as follows:
288
 
        # 1) Coalesce nearby reads into a single chunk
289
 
        #    This generates a list of combined regions, the total size
290
 
        #    and the size of the sub regions. This coalescing step is limited
291
 
        #    in the number of nearby chunks to combine, and is allowed to
292
 
        #    skip small breaks in the requests. Limiting it makes sure that
293
 
        #    we can start yielding some data earlier, and skipping means we
294
 
        #    make fewer requests. (Beneficial even when using async)
295
 
        # 2) Break up this combined regions into chunks that are smaller
296
 
        #    than 64KiB. Technically the limit is 65536, but we are a
297
 
        #    little bit conservative. This is because sftp has a maximum
298
 
        #    return chunk size of 64KiB (max size of an unsigned short)
299
 
        # 3) Issue a readv() to paramiko to create an async request for
300
 
        #    all of this data
301
 
        # 4) Read in the data as it comes back, until we've read one
302
 
        #    continuous section as determined in step 1
303
 
        # 5) Break up the full sections into hunks for the original requested
304
 
        #    offsets. And put them in a cache
305
 
        # 6) Check if the next request is in the cache, and if it is, remove
306
 
        #    it from the cache, and yield its data. Continue until no more
307
 
        #    entries are in the cache.
308
 
        # 7) loop back to step 4 until all data has been read
309
 
        #
310
 
        # TODO: jam 20060725 This could be optimized one step further, by
311
 
        #       attempting to yield whatever data we have read, even before
312
 
        #       the first coallesced section has been fully processed.
313
 
 
314
 
        # When coalescing for use with readv(), we don't really need to
315
 
        # use any fudge factor, because the requests are made asynchronously
316
 
        coalesced = list(self._coalesce_offsets(sorted_offsets,
317
 
                               limit=self._max_readv_combine,
318
 
                               fudge_factor=0,
319
 
                               ))
320
 
        requests = []
321
 
        for c_offset in coalesced:
322
 
            start = c_offset.start
323
 
            size = c_offset.length
324
 
 
325
 
            # We need to break this up into multiple requests
326
 
            while size > 0:
327
 
                next_size = min(size, self._max_request_size)
328
 
                requests.append((start, next_size))
329
 
                size -= next_size
330
 
                start += next_size
331
 
 
332
 
        mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
333
 
                len(offsets), len(coalesced), len(requests))
334
 
 
335
 
        # Queue the current read until we have read the full coalesced section
336
 
        cur_data = []
337
 
        cur_data_len = 0
338
 
        cur_coalesced_stack = iter(coalesced)
339
 
        cur_coalesced = cur_coalesced_stack.next()
340
 
 
341
 
        # Cache the results, but only until they have been fulfilled
342
 
        data_map = {}
343
 
        # turn the list of offsets into a stack
344
 
        offset_stack = iter(offsets)
345
 
        cur_offset_and_size = offset_stack.next()
346
 
 
347
 
        for data in fp.readv(requests):
348
 
            cur_data += data
349
 
            cur_data_len += len(data)
350
 
 
351
 
            if cur_data_len < cur_coalesced.length:
352
 
                continue
353
 
            assert cur_data_len == cur_coalesced.length, \
354
 
                "Somehow we read too much: %s != %s" % (cur_data_len,
355
 
                                                        cur_coalesced.length)
356
 
            all_data = ''.join(cur_data)
357
 
            cur_data = []
358
 
            cur_data_len = 0
359
 
 
360
 
            for suboffset, subsize in cur_coalesced.ranges:
361
 
                key = (cur_coalesced.start+suboffset, subsize)
362
 
                data_map[key] = all_data[suboffset:suboffset+subsize]
363
 
 
364
 
            # Now that we've read some data, see if we can yield anything back
365
 
            while cur_offset_and_size in data_map:
366
 
                this_data = data_map.pop(cur_offset_and_size)
367
 
                yield cur_offset_and_size[0], this_data
368
 
                cur_offset_and_size = offset_stack.next()
369
 
 
370
 
            # We read a coalesced entry, so mark it as done
371
 
            cur_coalesced = None
372
 
            # Now that we've read all of the data for this coalesced section
373
 
            # on to the next
374
 
            cur_coalesced = cur_coalesced_stack.next()
375
 
 
376
 
        if cur_coalesced is not None:
377
 
            raise errors.ShortReadvError(relpath, cur_coalesced.start,
378
 
                cur_coalesced.length, len(data))
 
465
        helper = _SFTPReadvHelper(offsets, relpath, self._report_activity)
 
466
        return helper.request_and_yield_offsets(fp)
379
467
 
380
468
    def put_file(self, relpath, f, mode=None):
381
469
        """
386
474
        :param mode: The final mode for the file
387
475
        """
388
476
        final_path = self._remote_path(relpath)
389
 
        self._put(final_path, f, mode=mode)
 
477
        return self._put(final_path, f, mode=mode)
390
478
 
391
479
    def _put(self, abspath, f, mode=None):
392
480
        """Helper function so both put() and copy_abspaths can reuse the code"""
397
485
        try:
398
486
            try:
399
487
                fout.set_pipelined(True)
400
 
                self._pump(f, fout)
 
488
                length = self._pump(f, fout)
401
489
            except (IOError, paramiko.SSHException), e:
402
490
                self._translate_io_exception(e, tmp_abspath)
403
491
            # XXX: This doesn't truly help like we would like it to.
406
494
            #      sticky bit. So it is probably best to stop chmodding, and
407
495
            #      just tell users that they need to set the umask correctly.
408
496
            #      The attr.st_mode = mode, in _sftp_open_exclusive
409
 
            #      will handle when the user wants the final mode to be more 
410
 
            #      restrictive. And then we avoid a round trip. Unless 
 
497
            #      will handle when the user wants the final mode to be more
 
498
            #      restrictive. And then we avoid a round trip. Unless
411
499
            #      paramiko decides to expose an async chmod()
412
500
 
413
501
            # This is designed to chmod() right before we close.
414
 
            # Because we set_pipelined() earlier, theoretically we might 
 
502
            # Because we set_pipelined() earlier, theoretically we might
415
503
            # avoid the round trip for fout.close()
416
504
            if mode is not None:
417
505
                self._get_sftp().chmod(tmp_abspath, mode)
418
506
            fout.close()
419
507
            closed = True
420
508
            self._rename_and_overwrite(tmp_abspath, abspath)
 
509
            return length
421
510
        except Exception, e:
422
511
            # If we fail, try to clean up the temporary file
423
512
            # before we throw the exception
458
547
                                                 ': unable to open')
459
548
 
460
549
                # This is designed to chmod() right before we close.
461
 
                # Because we set_pipelined() earlier, theoretically we might 
 
550
                # Because we set_pipelined() earlier, theoretically we might
462
551
                # avoid the round trip for fout.close()
463
552
                if mode is not None:
464
553
                    self._get_sftp().chmod(abspath, mode)
504
593
                                    create_parent_dir=create_parent_dir,
505
594
                                    dir_mode=dir_mode)
506
595
 
507
 
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
 
596
    def put_bytes_non_atomic(self, relpath, raw_bytes, mode=None,
508
597
                             create_parent_dir=False,
509
598
                             dir_mode=None):
 
599
        if not isinstance(raw_bytes, str):
 
600
            raise TypeError(
 
601
                'raw_bytes must be a plain string, not %s' % type(raw_bytes))
 
602
 
510
603
        def writer(fout):
511
 
            fout.write(bytes)
 
604
            fout.write(raw_bytes)
512
605
        self._put_non_atomic_helper(relpath, writer, mode=mode,
513
606
                                    create_parent_dir=create_parent_dir,
514
607
                                    dir_mode=dir_mode)
515
608
 
516
609
    def iter_files_recursive(self):
517
610
        """Walk the relative paths of all files in this transport."""
 
611
        # progress is handled by list_dir
518
612
        queue = list(self.list_dir('.'))
519
613
        while queue:
520
614
            relpath = queue.pop(0)
531
625
        else:
532
626
            local_mode = mode
533
627
        try:
 
628
            self._report_activity(len(abspath), 'write')
534
629
            self._get_sftp().mkdir(abspath, local_mode)
 
630
            self._report_activity(1, 'read')
535
631
            if mode is not None:
536
 
                self._get_sftp().chmod(abspath, mode=mode)
 
632
                # chmod a dir through sftp will erase any sgid bit set
 
633
                # on the server side.  So, if the bit mode are already
 
634
                # set, avoid the chmod.  If the mode is not fine but
 
635
                # the sgid bit is set, report a warning to the user
 
636
                # with the umask fix.
 
637
                stat = self._get_sftp().lstat(abspath)
 
638
                mode = mode & 0777 # can't set special bits anyway
 
639
                if mode != stat.st_mode & 0777:
 
640
                    if stat.st_mode & 06000:
 
641
                        warning('About to chmod %s over sftp, which will result'
 
642
                                ' in its suid or sgid bits being cleared.  If'
 
643
                                ' you want to preserve those bits, change your '
 
644
                                ' environment on the server to use umask 0%03o.'
 
645
                                % (abspath, 0777 - mode))
 
646
                    self._get_sftp().chmod(abspath, mode=mode)
537
647
        except (paramiko.SSHException, IOError), e:
538
648
            self._translate_io_exception(e, abspath, ': unable to mkdir',
539
649
                failure_exc=FileExists)
545
655
    def open_write_stream(self, relpath, mode=None):
546
656
        """See Transport.open_write_stream."""
547
657
        # initialise the file to zero-length
548
 
        # this is three round trips, but we don't use this 
549
 
        # api more than once per write_group at the moment so 
 
658
        # this is three round trips, but we don't use this
 
659
        # api more than once per write_group at the moment so
550
660
        # it is a tolerable overhead. Better would be to truncate
551
661
        # the file after opening. RBC 20070805
552
662
        self.put_bytes_non_atomic(relpath, "", mode)
575
685
        :param failure_exc: Paramiko has the super fun ability to raise completely
576
686
                           opaque errors that just set "e.args = ('Failure',)" with
577
687
                           no more information.
578
 
                           If this parameter is set, it defines the exception 
 
688
                           If this parameter is set, it defines the exception
579
689
                           to raise in these cases.
580
690
        """
581
691
        # paramiko seems to generate detailless errors.
584
694
            if (e.args == ('No such file or directory',) or
585
695
                e.args == ('No such file',)):
586
696
                raise NoSuchFile(path, str(e) + more_info)
587
 
            if (e.args == ('mkdir failed',)):
 
697
            if (e.args == ('mkdir failed',) or
 
698
                e.args[0].startswith('syserr: File exists')):
588
699
                raise FileExists(path, str(e) + more_info)
589
700
            # strange but true, for the paramiko server.
590
701
            if (e.args == ('Failure',)):
591
702
                raise failure_exc(path, str(e) + more_info)
 
703
            # Can be something like args = ('Directory not empty:
 
704
            # '/srv/bazaar.launchpad.net/blah...: '
 
705
            # [Errno 39] Directory not empty',)
 
706
            if (e.args[0].startswith('Directory not empty: ')
 
707
                or getattr(e, 'errno', None) == errno.ENOTEMPTY):
 
708
                raise errors.DirectoryNotEmpty(path, str(e))
 
709
            if e.args == ('Operation unsupported',):
 
710
                raise errors.TransportNotPossible()
592
711
            mutter('Raising exception with args %s', e.args)
593
712
        if getattr(e, 'errno', None) is not None:
594
713
            mutter('Raising exception with errno %s', e.errno)
621
740
 
622
741
    def _rename_and_overwrite(self, abs_from, abs_to):
623
742
        """Do a fancy rename on the remote server.
624
 
        
 
743
 
625
744
        Using the implementation provided by osutils.
626
745
        """
627
746
        try:
646
765
            self._get_sftp().remove(path)
647
766
        except (IOError, paramiko.SSHException), e:
648
767
            self._translate_io_exception(e, path, ': unable to delete')
649
 
            
 
768
 
650
769
    def external_url(self):
651
770
        """See bzrlib.transport.Transport.external_url."""
652
771
        # the external path for SFTP is the base
667
786
        path = self._remote_path(relpath)
668
787
        try:
669
788
            entries = self._get_sftp().listdir(path)
 
789
            self._report_activity(sum(map(len, entries)), 'read')
670
790
        except (IOError, paramiko.SSHException), e:
671
791
            self._translate_io_exception(e, path, ': failed to list_dir')
672
792
        return [urlutils.escape(entry) for entry in entries]
683
803
        """Return the stat information for a file."""
684
804
        path = self._remote_path(relpath)
685
805
        try:
686
 
            return self._get_sftp().stat(path)
 
806
            return self._get_sftp().lstat(path)
687
807
        except (IOError, paramiko.SSHException), e:
688
808
            self._translate_io_exception(e, path, ': unable to stat')
689
809
 
 
810
    def readlink(self, relpath):
 
811
        """See Transport.readlink."""
 
812
        path = self._remote_path(relpath)
 
813
        try:
 
814
            return self._get_sftp().readlink(path)
 
815
        except (IOError, paramiko.SSHException), e:
 
816
            self._translate_io_exception(e, path, ': unable to readlink')
 
817
 
 
818
    def symlink(self, source, link_name):
 
819
        """See Transport.symlink."""
 
820
        try:
 
821
            conn = self._get_sftp()
 
822
            sftp_retval = conn.symlink(source, link_name)
 
823
            if SFTP_OK != sftp_retval:
 
824
                raise TransportError(
 
825
                    '%r: unable to create symlink to %r' % (link_name, source),
 
826
                    sftp_retval
 
827
                )
 
828
        except (IOError, paramiko.SSHException), e:
 
829
            self._translate_io_exception(e, link_name,
 
830
                                         ': unable to create symlink to %r' % (source))
 
831
 
690
832
    def lock_read(self, relpath):
691
833
        """
692
834
        Lock the given file for shared (read) access.
729
871
        """
730
872
        # TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
731
873
        #       using the 'x' flag to indicate SFTP_FLAG_EXCL.
732
 
        #       However, there is no way to set the permission mode at open 
 
874
        #       However, there is no way to set the permission mode at open
733
875
        #       time using the sftp_client.file() functionality.
734
876
        path = self._get_sftp()._adjust_cwd(abspath)
735
877
        # mutter('sftp abspath %s => %s', abspath, path)
736
878
        attr = SFTPAttributes()
737
879
        if mode is not None:
738
880
            attr.st_mode = mode
739
 
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE 
 
881
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE
740
882
                | SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
741
883
        try:
742
884
            t, msg = self._get_sftp()._request(CMD_OPEN, path, omode, attr)
755
897
        else:
756
898
            return True
757
899
 
758
 
# ------------- server test implementation --------------
759
 
import threading
760
 
 
761
 
from bzrlib.tests.stub_sftp import StubServer, StubSFTPServer
762
 
 
763
 
STUB_SERVER_KEY = """
764
 
-----BEGIN RSA PRIVATE KEY-----
765
 
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
766
 
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
767
 
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
768
 
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
769
 
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
770
 
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
771
 
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
772
 
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
773
 
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
774
 
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
775
 
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
776
 
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
777
 
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
778
 
-----END RSA PRIVATE KEY-----
779
 
"""
780
 
 
781
 
 
782
 
class SocketListener(threading.Thread):
783
 
 
784
 
    def __init__(self, callback):
785
 
        threading.Thread.__init__(self)
786
 
        self._callback = callback
787
 
        self._socket = socket.socket()
788
 
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
789
 
        self._socket.bind(('localhost', 0))
790
 
        self._socket.listen(1)
791
 
        self.port = self._socket.getsockname()[1]
792
 
        self._stop_event = threading.Event()
793
 
 
794
 
    def stop(self):
795
 
        # called from outside this thread
796
 
        self._stop_event.set()
797
 
        # use a timeout here, because if the test fails, the server thread may
798
 
        # never notice the stop_event.
799
 
        self.join(5.0)
800
 
        self._socket.close()
801
 
 
802
 
    def run(self):
803
 
        while True:
804
 
            readable, writable_unused, exception_unused = \
805
 
                select.select([self._socket], [], [], 0.1)
806
 
            if self._stop_event.isSet():
807
 
                return
808
 
            if len(readable) == 0:
809
 
                continue
810
 
            try:
811
 
                s, addr_unused = self._socket.accept()
812
 
                # because the loopback socket is inline, and transports are
813
 
                # never explicitly closed, best to launch a new thread.
814
 
                threading.Thread(target=self._callback, args=(s,)).start()
815
 
            except socket.error, x:
816
 
                sys.excepthook(*sys.exc_info())
817
 
                warning('Socket error during accept() within unit test server'
818
 
                        ' thread: %r' % x)
819
 
            except Exception, x:
820
 
                # probably a failed test; unit test thread will log the
821
 
                # failure/error
822
 
                sys.excepthook(*sys.exc_info())
823
 
                warning('Exception from within unit test server thread: %r' % 
824
 
                        x)
825
 
 
826
 
 
827
 
class SocketDelay(object):
828
 
    """A socket decorator to make TCP appear slower.
829
 
 
830
 
    This changes recv, send, and sendall to add a fixed latency to each python
831
 
    call if a new roundtrip is detected. That is, when a recv is called and the
832
 
    flag new_roundtrip is set, latency is charged. Every send and send_all
833
 
    sets this flag.
834
 
 
835
 
    In addition every send, sendall and recv sleeps a bit per character send to
836
 
    simulate bandwidth.
837
 
 
838
 
    Not all methods are implemented, this is deliberate as this class is not a
839
 
    replacement for the builtin sockets layer. fileno is not implemented to
840
 
    prevent the proxy being bypassed. 
841
 
    """
842
 
 
843
 
    simulated_time = 0
844
 
    _proxied_arguments = dict.fromkeys([
845
 
        "close", "getpeername", "getsockname", "getsockopt", "gettimeout",
846
 
        "setblocking", "setsockopt", "settimeout", "shutdown"])
847
 
 
848
 
    def __init__(self, sock, latency, bandwidth=1.0, 
849
 
                 really_sleep=True):
850
 
        """ 
851
 
        :param bandwith: simulated bandwith (MegaBit)
852
 
        :param really_sleep: If set to false, the SocketDelay will just
853
 
        increase a counter, instead of calling time.sleep. This is useful for
854
 
        unittesting the SocketDelay.
855
 
        """
856
 
        self.sock = sock
857
 
        self.latency = latency
858
 
        self.really_sleep = really_sleep
859
 
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024) 
860
 
        self.new_roundtrip = False
861
 
 
862
 
    def sleep(self, s):
863
 
        if self.really_sleep:
864
 
            time.sleep(s)
865
 
        else:
866
 
            SocketDelay.simulated_time += s
867
 
 
868
 
    def __getattr__(self, attr):
869
 
        if attr in SocketDelay._proxied_arguments:
870
 
            return getattr(self.sock, attr)
871
 
        raise AttributeError("'SocketDelay' object has no attribute %r" %
872
 
                             attr)
873
 
 
874
 
    def dup(self):
875
 
        return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
876
 
                           self._sleep)
877
 
 
878
 
    def recv(self, *args):
879
 
        data = self.sock.recv(*args)
880
 
        if data and self.new_roundtrip:
881
 
            self.new_roundtrip = False
882
 
            self.sleep(self.latency)
883
 
        self.sleep(len(data) * self.time_per_byte)
884
 
        return data
885
 
 
886
 
    def sendall(self, data, flags=0):
887
 
        if not self.new_roundtrip:
888
 
            self.new_roundtrip = True
889
 
            self.sleep(self.latency)
890
 
        self.sleep(len(data) * self.time_per_byte)
891
 
        return self.sock.sendall(data, flags)
892
 
 
893
 
    def send(self, data, flags=0):
894
 
        if not self.new_roundtrip:
895
 
            self.new_roundtrip = True
896
 
            self.sleep(self.latency)
897
 
        bytes_sent = self.sock.send(data, flags)
898
 
        self.sleep(bytes_sent * self.time_per_byte)
899
 
        return bytes_sent
900
 
 
901
 
 
902
 
class SFTPServer(Server):
903
 
    """Common code for SFTP server facilities."""
904
 
 
905
 
    def __init__(self, server_interface=StubServer):
906
 
        self._original_vendor = None
907
 
        self._homedir = None
908
 
        self._server_homedir = None
909
 
        self._listener = None
910
 
        self._root = None
911
 
        self._vendor = ssh.ParamikoVendor()
912
 
        self._server_interface = server_interface
913
 
        # sftp server logs
914
 
        self.logs = []
915
 
        self.add_latency = 0
916
 
 
917
 
    def _get_sftp_url(self, path):
918
 
        """Calculate an sftp url to this server for path."""
919
 
        return 'sftp://foo:bar@localhost:%d/%s' % (self._listener.port, path)
920
 
 
921
 
    def log(self, message):
922
 
        """StubServer uses this to log when a new server is created."""
923
 
        self.logs.append(message)
924
 
 
925
 
    def _run_server_entry(self, sock):
926
 
        """Entry point for all implementations of _run_server.
927
 
        
928
 
        If self.add_latency is > 0.000001 then sock is given a latency adding
929
 
        decorator.
930
 
        """
931
 
        if self.add_latency > 0.000001:
932
 
            sock = SocketDelay(sock, self.add_latency)
933
 
        return self._run_server(sock)
934
 
 
935
 
    def _run_server(self, s):
936
 
        ssh_server = paramiko.Transport(s)
937
 
        key_file = pathjoin(self._homedir, 'test_rsa.key')
938
 
        f = open(key_file, 'w')
939
 
        f.write(STUB_SERVER_KEY)
940
 
        f.close()
941
 
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
942
 
        ssh_server.add_server_key(host_key)
943
 
        server = self._server_interface(self)
944
 
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
945
 
                                         StubSFTPServer, root=self._root,
946
 
                                         home=self._server_homedir)
947
 
        event = threading.Event()
948
 
        ssh_server.start_server(event, server)
949
 
        event.wait(5.0)
950
 
    
951
 
    def setUp(self, backing_server=None):
952
 
        # XXX: TODO: make sftpserver back onto backing_server rather than local
953
 
        # disk.
954
 
        assert (backing_server is None or
955
 
                isinstance(backing_server, local.LocalURLServer)), (
956
 
            "backing_server should not be %r, because this can only serve the "
957
 
            "local current working directory." % (backing_server,))
958
 
        self._original_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
959
 
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._vendor
960
 
        if sys.platform == 'win32':
961
 
            # Win32 needs to use the UNICODE api
962
 
            self._homedir = getcwd()
963
 
        else:
964
 
            # But Linux SFTP servers should just deal in bytestreams
965
 
            self._homedir = os.getcwd()
966
 
        if self._server_homedir is None:
967
 
            self._server_homedir = self._homedir
968
 
        self._root = '/'
969
 
        if sys.platform == 'win32':
970
 
            self._root = ''
971
 
        self._listener = SocketListener(self._run_server_entry)
972
 
        self._listener.setDaemon(True)
973
 
        self._listener.start()
974
 
 
975
 
    def tearDown(self):
976
 
        """See bzrlib.transport.Server.tearDown."""
977
 
        self._listener.stop()
978
 
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._original_vendor
979
 
 
980
 
    def get_bogus_url(self):
981
 
        """See bzrlib.transport.Server.get_bogus_url."""
982
 
        # this is chosen to try to prevent trouble with proxies, wierd dns, etc
983
 
        # we bind a random socket, so that we get a guaranteed unused port
984
 
        # we just never listen on that port
985
 
        s = socket.socket()
986
 
        s.bind(('localhost', 0))
987
 
        return 'sftp://%s:%s/' % s.getsockname()
988
 
 
989
 
 
990
 
class SFTPFullAbsoluteServer(SFTPServer):
991
 
    """A test server for sftp transports, using absolute urls and ssh."""
992
 
 
993
 
    def get_url(self):
994
 
        """See bzrlib.transport.Server.get_url."""
995
 
        homedir = self._homedir
996
 
        if sys.platform != 'win32':
997
 
            # Remove the initial '/' on all platforms but win32
998
 
            homedir = homedir[1:]
999
 
        return self._get_sftp_url(urlutils.escape(homedir))
1000
 
 
1001
 
 
1002
 
class SFTPServerWithoutSSH(SFTPServer):
1003
 
    """An SFTP server that uses a simple TCP socket pair rather than SSH."""
1004
 
 
1005
 
    def __init__(self):
1006
 
        super(SFTPServerWithoutSSH, self).__init__()
1007
 
        self._vendor = ssh.LoopbackVendor()
1008
 
 
1009
 
    def _run_server(self, sock):
1010
 
        # Re-import these as locals, so that they're still accessible during
1011
 
        # interpreter shutdown (when all module globals get set to None, leading
1012
 
        # to confusing errors like "'NoneType' object has no attribute 'error'".
1013
 
        class FakeChannel(object):
1014
 
            def get_transport(self):
1015
 
                return self
1016
 
            def get_log_channel(self):
1017
 
                return 'paramiko'
1018
 
            def get_name(self):
1019
 
                return '1'
1020
 
            def get_hexdump(self):
1021
 
                return False
1022
 
            def close(self):
1023
 
                pass
1024
 
 
1025
 
        server = paramiko.SFTPServer(FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
1026
 
                                     root=self._root, home=self._server_homedir)
1027
 
        try:
1028
 
            server.start_subsystem('sftp', None, sock)
1029
 
        except socket.error, e:
1030
 
            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
1031
 
                # it's okay for the client to disconnect abruptly
1032
 
                # (bug in paramiko 1.6: it should absorb this exception)
1033
 
                pass
1034
 
            else:
1035
 
                raise
1036
 
        except Exception, e:
1037
 
            # This typically seems to happen during interpreter shutdown, so
1038
 
            # most of the useful ways to report this error are won't work.
1039
 
            # Writing the exception type, and then the text of the exception,
1040
 
            # seems to be the best we can do.
1041
 
            import sys
1042
 
            sys.stderr.write('\nEXCEPTION %r: ' % (e.__class__,))
1043
 
            sys.stderr.write('%s\n\n' % (e,))
1044
 
        server.finish_subsystem()
1045
 
 
1046
 
 
1047
 
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
1048
 
    """A test server for sftp transports, using absolute urls."""
1049
 
 
1050
 
    def get_url(self):
1051
 
        """See bzrlib.transport.Server.get_url."""
1052
 
        homedir = self._homedir
1053
 
        if sys.platform != 'win32':
1054
 
            # Remove the initial '/' on all platforms but win32
1055
 
            homedir = homedir[1:]
1056
 
        return self._get_sftp_url(urlutils.escape(homedir))
1057
 
 
1058
 
 
1059
 
class SFTPHomeDirServer(SFTPServerWithoutSSH):
1060
 
    """A test server for sftp transports, using homedir relative urls."""
1061
 
 
1062
 
    def get_url(self):
1063
 
        """See bzrlib.transport.Server.get_url."""
1064
 
        return self._get_sftp_url("~/")
1065
 
 
1066
 
 
1067
 
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
1068
 
    """A test server for sftp transports where only absolute paths will work.
1069
 
 
1070
 
    It does this by serving from a deeply-nested directory that doesn't exist.
1071
 
    """
1072
 
 
1073
 
    def setUp(self, backing_server=None):
1074
 
        self._server_homedir = '/dev/noone/runs/tests/here'
1075
 
        super(SFTPSiblingAbsoluteServer, self).setUp(backing_server)
1076
 
 
1077
900
 
1078
901
def get_test_permutations():
1079
902
    """Return the permutations to be used in testing."""
1080
 
    return [(SFTPTransport, SFTPAbsoluteServer),
1081
 
            (SFTPTransport, SFTPHomeDirServer),
1082
 
            (SFTPTransport, SFTPSiblingAbsoluteServer),
 
903
    from bzrlib.tests import stub_sftp
 
904
    return [(SFTPTransport, stub_sftp.SFTPAbsoluteServer),
 
905
            (SFTPTransport, stub_sftp.SFTPHomeDirServer),
 
906
            (SFTPTransport, stub_sftp.SFTPSiblingAbsoluteServer),
1083
907
            ]