~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

  • Committer: Andrew Bennetts
  • Date: 2009-12-03 02:24:54 UTC
  • mfrom: (4634.101.4 2.0)
  • mto: This revision was merged to the branch mainline in revision 4857.
  • Revision ID: andrew.bennetts@canonical.com-20091203022454-m2gyhbcdqi1t7ujz
Merge lp:bzr/2.0 into lp:bzr.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Implementation of Transport over SFTP, using paramiko."""
18
18
 
19
 
from __future__ import absolute_import
20
 
 
21
19
# TODO: Remove the transport-based lock_read and lock_write methods.  They'll
22
20
# then raise TransportNotPossible, which will break remote access to any
23
21
# formats which rely on OS-level locks.  That should be fine as those formats
30
28
import itertools
31
29
import os
32
30
import random
 
31
import select
 
32
import socket
33
33
import stat
34
34
import sys
35
35
import time
 
36
import urllib
 
37
import urlparse
36
38
import warnings
37
39
 
38
40
from bzrlib import (
42
44
    urlutils,
43
45
    )
44
46
from bzrlib.errors import (FileExists,
45
 
                           NoSuchFile,
 
47
                           NoSuchFile, PathNotChild,
46
48
                           TransportError,
47
49
                           LockError,
48
50
                           PathError,
49
51
                           ParamikoNotPresent,
50
52
                           )
51
 
from bzrlib.osutils import fancy_rename
 
53
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
 
54
from bzrlib.symbol_versioning import (
 
55
        deprecated_function,
 
56
        )
52
57
from bzrlib.trace import mutter, warning
53
58
from bzrlib.transport import (
54
59
    FileFileStream,
55
60
    _file_streams,
 
61
    local,
 
62
    Server,
56
63
    ssh,
57
64
    ConnectedTransport,
58
65
    )
77
84
else:
78
85
    from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
79
86
                               SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
80
 
                               SFTP_OK, CMD_HANDLE, CMD_OPEN)
 
87
                               CMD_HANDLE, CMD_OPEN)
81
88
    from paramiko.sftp_attr import SFTPAttributes
82
89
    from paramiko.sftp_file import SFTPFile
83
90
 
109
116
        except FileExists:
110
117
            raise LockError('File %r already locked' % (self.path,))
111
118
 
 
119
    def __del__(self):
 
120
        """Should this warn, or actually try to cleanup?"""
 
121
        if self.lock_file:
 
122
            warning("SFTPLock %r not explicitly unlocked" % (self.path,))
 
123
            self.unlock()
 
124
 
112
125
    def unlock(self):
113
126
        if not self.lock_file:
114
127
            return
270
283
                    buffered = buffered[buffered_offset:]
271
284
                    buffered_data = [buffered]
272
285
                    buffered_len = len(buffered)
273
 
        # now that the data stream is done, close the handle
274
 
        fp.close()
275
286
        if buffered_len:
276
287
            buffered = ''.join(buffered_data)
277
288
            del buffered_data[:]
332
343
    # up the request itself, rather than us having to worry about it
333
344
    _max_request_size = 32768
334
345
 
 
346
    def __init__(self, base, _from_transport=None):
 
347
        super(SFTPTransport, self).__init__(base,
 
348
                                            _from_transport=_from_transport)
 
349
 
335
350
    def _remote_path(self, relpath):
336
351
        """Return the path to be passed along the sftp protocol for relpath.
337
352
 
338
353
        :param relpath: is a urlencoded string.
339
354
        """
340
 
        remote_path = self._parsed_url.clone(relpath).path
 
355
        relative = urlutils.unescape(relpath).encode('utf-8')
 
356
        remote_path = self._combine_paths(self._path, relative)
341
357
        # the initial slash should be removed from the path, and treated as a
342
358
        # homedir relative path (the path begins with a double slash if it is
343
359
        # absolute).  see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
362
378
        in base url at transport creation time.
363
379
        """
364
380
        if credentials is None:
365
 
            password = self._parsed_url.password
 
381
            password = self._password
366
382
        else:
367
383
            password = credentials
368
384
 
369
385
        vendor = ssh._get_ssh_vendor()
370
 
        user = self._parsed_url.user
 
386
        user = self._user
371
387
        if user is None:
372
388
            auth = config.AuthenticationConfig()
373
 
            user = auth.get_user('ssh', self._parsed_url.host,
374
 
                self._parsed_url.port)
375
 
        connection = vendor.connect_sftp(self._parsed_url.user, password,
376
 
            self._parsed_url.host, self._parsed_url.port)
 
389
            user = auth.get_user('ssh', self._host, self._port)
 
390
        connection = vendor.connect_sftp(self._user, password,
 
391
                                         self._host, self._port)
377
392
        return connection, (user, password)
378
393
 
379
 
    def disconnect(self):
380
 
        connection = self._get_connection()
381
 
        if connection is not None:
382
 
            connection.close()
383
 
 
384
394
    def _get_sftp(self):
385
395
        """Ensures that a connection is established"""
386
396
        connection = self._get_connection()
408
418
        :param relpath: The relative path to the file
409
419
        """
410
420
        try:
 
421
            # FIXME: by returning the file directly, we don't pass this
 
422
            # through to report_activity.  We could try wrapping the object
 
423
            # before it's returned.  For readv and get_bytes it's handled in
 
424
            # the higher-level function.
 
425
            # -- mbp 20090126
411
426
            path = self._remote_path(relpath)
412
427
            f = self._get_sftp().file(path, mode='rb')
413
428
            if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
702
717
            if (e.args[0].startswith('Directory not empty: ')
703
718
                or getattr(e, 'errno', None) == errno.ENOTEMPTY):
704
719
                raise errors.DirectoryNotEmpty(path, str(e))
705
 
            if e.args == ('Operation unsupported',):
706
 
                raise errors.TransportNotPossible()
707
720
            mutter('Raising exception with args %s', e.args)
708
721
        if getattr(e, 'errno', None) is not None:
709
722
            mutter('Raising exception with errno %s', e.errno)
799
812
        """Return the stat information for a file."""
800
813
        path = self._remote_path(relpath)
801
814
        try:
802
 
            return self._get_sftp().lstat(path)
 
815
            return self._get_sftp().stat(path)
803
816
        except (IOError, paramiko.SSHException), e:
804
817
            self._translate_io_exception(e, path, ': unable to stat')
805
818
 
806
 
    def readlink(self, relpath):
807
 
        """See Transport.readlink."""
808
 
        path = self._remote_path(relpath)
809
 
        try:
810
 
            return self._get_sftp().readlink(path)
811
 
        except (IOError, paramiko.SSHException), e:
812
 
            self._translate_io_exception(e, path, ': unable to readlink')
813
 
 
814
 
    def symlink(self, source, link_name):
815
 
        """See Transport.symlink."""
816
 
        try:
817
 
            conn = self._get_sftp()
818
 
            sftp_retval = conn.symlink(source, link_name)
819
 
            if SFTP_OK != sftp_retval:
820
 
                raise TransportError(
821
 
                    '%r: unable to create symlink to %r' % (link_name, source),
822
 
                    sftp_retval
823
 
                )
824
 
        except (IOError, paramiko.SSHException), e:
825
 
            self._translate_io_exception(e, link_name,
826
 
                                         ': unable to create symlink to %r' % (source))
827
 
 
828
819
    def lock_read(self, relpath):
829
820
        """
830
821
        Lock the given file for shared (read) access.
893
884
        else:
894
885
            return True
895
886
 
 
887
# ------------- server test implementation --------------
 
888
import threading
 
889
 
 
890
from bzrlib.tests.stub_sftp import StubServer, StubSFTPServer
 
891
 
 
892
STUB_SERVER_KEY = """
 
893
-----BEGIN RSA PRIVATE KEY-----
 
894
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
 
895
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
 
896
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
 
897
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
 
898
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
 
899
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
 
900
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
 
901
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
 
902
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
 
903
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
 
904
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
 
905
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
 
906
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
 
907
-----END RSA PRIVATE KEY-----
 
908
"""
 
909
 
 
910
 
 
911
class SocketListener(threading.Thread):
 
912
 
 
913
    def __init__(self, callback):
 
914
        threading.Thread.__init__(self)
 
915
        self._callback = callback
 
916
        self._socket = socket.socket()
 
917
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
918
        self._socket.bind(('localhost', 0))
 
919
        self._socket.listen(1)
 
920
        self.host, self.port = self._socket.getsockname()[:2]
 
921
        self._stop_event = threading.Event()
 
922
 
 
923
    def stop(self):
 
924
        # called from outside this thread
 
925
        self._stop_event.set()
 
926
        # use a timeout here, because if the test fails, the server thread may
 
927
        # never notice the stop_event.
 
928
        self.join(5.0)
 
929
        self._socket.close()
 
930
 
 
931
    def run(self):
 
932
        while True:
 
933
            readable, writable_unused, exception_unused = \
 
934
                select.select([self._socket], [], [], 0.1)
 
935
            if self._stop_event.isSet():
 
936
                return
 
937
            if len(readable) == 0:
 
938
                continue
 
939
            try:
 
940
                s, addr_unused = self._socket.accept()
 
941
                # because the loopback socket is inline, and transports are
 
942
                # never explicitly closed, best to launch a new thread.
 
943
                threading.Thread(target=self._callback, args=(s,)).start()
 
944
            except socket.error, x:
 
945
                sys.excepthook(*sys.exc_info())
 
946
                warning('Socket error during accept() within unit test server'
 
947
                        ' thread: %r' % x)
 
948
            except Exception, x:
 
949
                # probably a failed test; unit test thread will log the
 
950
                # failure/error
 
951
                sys.excepthook(*sys.exc_info())
 
952
                warning('Exception from within unit test server thread: %r' %
 
953
                        x)
 
954
 
 
955
 
 
956
class SocketDelay(object):
 
957
    """A socket decorator to make TCP appear slower.
 
958
 
 
959
    This changes recv, send, and sendall to add a fixed latency to each python
 
960
    call if a new roundtrip is detected. That is, when a recv is called and the
 
961
    flag new_roundtrip is set, latency is charged. Every send and send_all
 
962
    sets this flag.
 
963
 
 
964
    In addition every send, sendall and recv sleeps a bit per character send to
 
965
    simulate bandwidth.
 
966
 
 
967
    Not all methods are implemented, this is deliberate as this class is not a
 
968
    replacement for the builtin sockets layer. fileno is not implemented to
 
969
    prevent the proxy being bypassed.
 
970
    """
 
971
 
 
972
    simulated_time = 0
 
973
    _proxied_arguments = dict.fromkeys([
 
974
        "close", "getpeername", "getsockname", "getsockopt", "gettimeout",
 
975
        "setblocking", "setsockopt", "settimeout", "shutdown"])
 
976
 
 
977
    def __init__(self, sock, latency, bandwidth=1.0,
 
978
                 really_sleep=True):
 
979
        """
 
980
        :param bandwith: simulated bandwith (MegaBit)
 
981
        :param really_sleep: If set to false, the SocketDelay will just
 
982
        increase a counter, instead of calling time.sleep. This is useful for
 
983
        unittesting the SocketDelay.
 
984
        """
 
985
        self.sock = sock
 
986
        self.latency = latency
 
987
        self.really_sleep = really_sleep
 
988
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024)
 
989
        self.new_roundtrip = False
 
990
 
 
991
    def sleep(self, s):
 
992
        if self.really_sleep:
 
993
            time.sleep(s)
 
994
        else:
 
995
            SocketDelay.simulated_time += s
 
996
 
 
997
    def __getattr__(self, attr):
 
998
        if attr in SocketDelay._proxied_arguments:
 
999
            return getattr(self.sock, attr)
 
1000
        raise AttributeError("'SocketDelay' object has no attribute %r" %
 
1001
                             attr)
 
1002
 
 
1003
    def dup(self):
 
1004
        return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
 
1005
                           self._sleep)
 
1006
 
 
1007
    def recv(self, *args):
 
1008
        data = self.sock.recv(*args)
 
1009
        if data and self.new_roundtrip:
 
1010
            self.new_roundtrip = False
 
1011
            self.sleep(self.latency)
 
1012
        self.sleep(len(data) * self.time_per_byte)
 
1013
        return data
 
1014
 
 
1015
    def sendall(self, data, flags=0):
 
1016
        if not self.new_roundtrip:
 
1017
            self.new_roundtrip = True
 
1018
            self.sleep(self.latency)
 
1019
        self.sleep(len(data) * self.time_per_byte)
 
1020
        return self.sock.sendall(data, flags)
 
1021
 
 
1022
    def send(self, data, flags=0):
 
1023
        if not self.new_roundtrip:
 
1024
            self.new_roundtrip = True
 
1025
            self.sleep(self.latency)
 
1026
        bytes_sent = self.sock.send(data, flags)
 
1027
        self.sleep(bytes_sent * self.time_per_byte)
 
1028
        return bytes_sent
 
1029
 
 
1030
 
 
1031
class SFTPServer(Server):
 
1032
    """Common code for SFTP server facilities."""
 
1033
 
 
1034
    def __init__(self, server_interface=StubServer):
 
1035
        self._original_vendor = None
 
1036
        self._homedir = None
 
1037
        self._server_homedir = None
 
1038
        self._listener = None
 
1039
        self._root = None
 
1040
        self._vendor = ssh.ParamikoVendor()
 
1041
        self._server_interface = server_interface
 
1042
        # sftp server logs
 
1043
        self.logs = []
 
1044
        self.add_latency = 0
 
1045
 
 
1046
    def _get_sftp_url(self, path):
 
1047
        """Calculate an sftp url to this server for path."""
 
1048
        return 'sftp://foo:bar@%s:%d/%s' % (self._listener.host,
 
1049
                                            self._listener.port, path)
 
1050
 
 
1051
    def log(self, message):
 
1052
        """StubServer uses this to log when a new server is created."""
 
1053
        self.logs.append(message)
 
1054
 
 
1055
    def _run_server_entry(self, sock):
 
1056
        """Entry point for all implementations of _run_server.
 
1057
 
 
1058
        If self.add_latency is > 0.000001 then sock is given a latency adding
 
1059
        decorator.
 
1060
        """
 
1061
        if self.add_latency > 0.000001:
 
1062
            sock = SocketDelay(sock, self.add_latency)
 
1063
        return self._run_server(sock)
 
1064
 
 
1065
    def _run_server(self, s):
 
1066
        ssh_server = paramiko.Transport(s)
 
1067
        key_file = pathjoin(self._homedir, 'test_rsa.key')
 
1068
        f = open(key_file, 'w')
 
1069
        f.write(STUB_SERVER_KEY)
 
1070
        f.close()
 
1071
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
1072
        ssh_server.add_server_key(host_key)
 
1073
        server = self._server_interface(self)
 
1074
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
 
1075
                                         StubSFTPServer, root=self._root,
 
1076
                                         home=self._server_homedir)
 
1077
        event = threading.Event()
 
1078
        ssh_server.start_server(event, server)
 
1079
        event.wait(5.0)
 
1080
 
 
1081
    def setUp(self, backing_server=None):
 
1082
        # XXX: TODO: make sftpserver back onto backing_server rather than local
 
1083
        # disk.
 
1084
        if not (backing_server is None or
 
1085
                isinstance(backing_server, local.LocalURLServer)):
 
1086
            raise AssertionError(
 
1087
                "backing_server should not be %r, because this can only serve the "
 
1088
                "local current working directory." % (backing_server,))
 
1089
        self._original_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
 
1090
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._vendor
 
1091
        if sys.platform == 'win32':
 
1092
            # Win32 needs to use the UNICODE api
 
1093
            self._homedir = getcwd()
 
1094
        else:
 
1095
            # But Linux SFTP servers should just deal in bytestreams
 
1096
            self._homedir = os.getcwd()
 
1097
        if self._server_homedir is None:
 
1098
            self._server_homedir = self._homedir
 
1099
        self._root = '/'
 
1100
        if sys.platform == 'win32':
 
1101
            self._root = ''
 
1102
        self._listener = SocketListener(self._run_server_entry)
 
1103
        self._listener.setDaemon(True)
 
1104
        self._listener.start()
 
1105
 
 
1106
    def tearDown(self):
 
1107
        """See bzrlib.transport.Server.tearDown."""
 
1108
        self._listener.stop()
 
1109
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._original_vendor
 
1110
 
 
1111
    def get_bogus_url(self):
 
1112
        """See bzrlib.transport.Server.get_bogus_url."""
 
1113
        # this is chosen to try to prevent trouble with proxies, wierd dns, etc
 
1114
        # we bind a random socket, so that we get a guaranteed unused port
 
1115
        # we just never listen on that port
 
1116
        s = socket.socket()
 
1117
        s.bind(('localhost', 0))
 
1118
        return 'sftp://%s:%s/' % s.getsockname()
 
1119
 
 
1120
 
 
1121
class SFTPFullAbsoluteServer(SFTPServer):
 
1122
    """A test server for sftp transports, using absolute urls and ssh."""
 
1123
 
 
1124
    def get_url(self):
 
1125
        """See bzrlib.transport.Server.get_url."""
 
1126
        homedir = self._homedir
 
1127
        if sys.platform != 'win32':
 
1128
            # Remove the initial '/' on all platforms but win32
 
1129
            homedir = homedir[1:]
 
1130
        return self._get_sftp_url(urlutils.escape(homedir))
 
1131
 
 
1132
 
 
1133
class SFTPServerWithoutSSH(SFTPServer):
 
1134
    """An SFTP server that uses a simple TCP socket pair rather than SSH."""
 
1135
 
 
1136
    def __init__(self):
 
1137
        super(SFTPServerWithoutSSH, self).__init__()
 
1138
        self._vendor = ssh.LoopbackVendor()
 
1139
 
 
1140
    def _run_server(self, sock):
 
1141
        # Re-import these as locals, so that they're still accessible during
 
1142
        # interpreter shutdown (when all module globals get set to None, leading
 
1143
        # to confusing errors like "'NoneType' object has no attribute 'error'".
 
1144
        class FakeChannel(object):
 
1145
            def get_transport(self):
 
1146
                return self
 
1147
            def get_log_channel(self):
 
1148
                return 'paramiko'
 
1149
            def get_name(self):
 
1150
                return '1'
 
1151
            def get_hexdump(self):
 
1152
                return False
 
1153
            def close(self):
 
1154
                pass
 
1155
 
 
1156
        server = paramiko.SFTPServer(
 
1157
            FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
 
1158
            root=self._root, home=self._server_homedir)
 
1159
        try:
 
1160
            server.start_subsystem(
 
1161
                'sftp', None, ssh.SocketAsChannelAdapter(sock))
 
1162
        except socket.error, e:
 
1163
            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
 
1164
                # it's okay for the client to disconnect abruptly
 
1165
                # (bug in paramiko 1.6: it should absorb this exception)
 
1166
                pass
 
1167
            else:
 
1168
                raise
 
1169
        except Exception, e:
 
1170
            # This typically seems to happen during interpreter shutdown, so
 
1171
            # most of the useful ways to report this error are won't work.
 
1172
            # Writing the exception type, and then the text of the exception,
 
1173
            # seems to be the best we can do.
 
1174
            import sys
 
1175
            sys.stderr.write('\nEXCEPTION %r: ' % (e.__class__,))
 
1176
            sys.stderr.write('%s\n\n' % (e,))
 
1177
        server.finish_subsystem()
 
1178
 
 
1179
 
 
1180
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
 
1181
    """A test server for sftp transports, using absolute urls."""
 
1182
 
 
1183
    def get_url(self):
 
1184
        """See bzrlib.transport.Server.get_url."""
 
1185
        homedir = self._homedir
 
1186
        if sys.platform != 'win32':
 
1187
            # Remove the initial '/' on all platforms but win32
 
1188
            homedir = homedir[1:]
 
1189
        return self._get_sftp_url(urlutils.escape(homedir))
 
1190
 
 
1191
 
 
1192
class SFTPHomeDirServer(SFTPServerWithoutSSH):
 
1193
    """A test server for sftp transports, using homedir relative urls."""
 
1194
 
 
1195
    def get_url(self):
 
1196
        """See bzrlib.transport.Server.get_url."""
 
1197
        return self._get_sftp_url("~/")
 
1198
 
 
1199
 
 
1200
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
 
1201
    """A test server for sftp transports where only absolute paths will work.
 
1202
 
 
1203
    It does this by serving from a deeply-nested directory that doesn't exist.
 
1204
    """
 
1205
 
 
1206
    def setUp(self, backing_server=None):
 
1207
        self._server_homedir = '/dev/noone/runs/tests/here'
 
1208
        super(SFTPSiblingAbsoluteServer, self).setUp(backing_server)
 
1209
 
896
1210
 
897
1211
def get_test_permutations():
898
1212
    """Return the permutations to be used in testing."""
899
 
    from bzrlib.tests import stub_sftp
900
 
    return [(SFTPTransport, stub_sftp.SFTPAbsoluteServer),
901
 
            (SFTPTransport, stub_sftp.SFTPHomeDirServer),
902
 
            (SFTPTransport, stub_sftp.SFTPSiblingAbsoluteServer),
 
1213
    return [(SFTPTransport, SFTPAbsoluteServer),
 
1214
            (SFTPTransport, SFTPHomeDirServer),
 
1215
            (SFTPTransport, SFTPSiblingAbsoluteServer),
903
1216
            ]