~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

  • Committer: Jelmer Vernooij
  • Date: 2009-04-10 15:58:09 UTC
  • mto: This revision was merged to the branch mainline in revision 4284.
  • Revision ID: jelmer@samba.org-20090410155809-kdibzcjvp7pdb83f
Fix missing import.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
28
28
import itertools
29
29
import os
30
30
import random
 
31
import select
 
32
import socket
31
33
import stat
32
34
import sys
33
35
import time
82
84
else:
83
85
    from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
84
86
                               SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
85
 
                               SFTP_OK, CMD_HANDLE, CMD_OPEN)
 
87
                               CMD_HANDLE, CMD_OPEN)
86
88
    from paramiko.sftp_attr import SFTPAttributes
87
89
    from paramiko.sftp_file import SFTPFile
88
90
 
114
116
        except FileExists:
115
117
            raise LockError('File %r already locked' % (self.path,))
116
118
 
 
119
    def __del__(self):
 
120
        """Should this warn, or actually try to cleanup?"""
 
121
        if self.lock_file:
 
122
            warning("SFTPLock %r not explicitly unlocked" % (self.path,))
 
123
            self.unlock()
 
124
 
117
125
    def unlock(self):
118
126
        if not self.lock_file:
119
127
            return
275
283
                    buffered = buffered[buffered_offset:]
276
284
                    buffered_data = [buffered]
277
285
                    buffered_len = len(buffered)
278
 
        # now that the data stream is done, close the handle
279
 
        fp.close()
280
286
        if buffered_len:
281
287
            buffered = ''.join(buffered_data)
282
288
            del buffered_data[:]
385
391
                                         self._host, self._port)
386
392
        return connection, (user, password)
387
393
 
388
 
    def disconnect(self):
389
 
        connection = self._get_connection()
390
 
        if connection is not None:
391
 
            connection.close()
392
 
 
393
394
    def _get_sftp(self):
394
395
        """Ensures that a connection is established"""
395
396
        connection = self._get_connection()
417
418
        :param relpath: The relative path to the file
418
419
        """
419
420
        try:
 
421
            # FIXME: by returning the file directly, we don't pass this
 
422
            # through to report_activity.  We could try wrapping the object
 
423
            # before it's returned.  For readv and get_bytes it's handled in
 
424
            # the higher-level function.
 
425
            # -- mbp 20090126
420
426
            path = self._remote_path(relpath)
421
427
            f = self._get_sftp().file(path, mode='rb')
422
428
            if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
705
711
            # strange but true, for the paramiko server.
706
712
            if (e.args == ('Failure',)):
707
713
                raise failure_exc(path, str(e) + more_info)
708
 
            # Can be something like args = ('Directory not empty:
709
 
            # '/srv/bazaar.launchpad.net/blah...: '
710
 
            # [Errno 39] Directory not empty',)
711
 
            if (e.args[0].startswith('Directory not empty: ')
712
 
                or getattr(e, 'errno', None) == errno.ENOTEMPTY):
713
 
                raise errors.DirectoryNotEmpty(path, str(e))
714
 
            if e.args == ('Operation unsupported',):
715
 
                raise errors.TransportNotPossible()
716
714
            mutter('Raising exception with args %s', e.args)
717
715
        if getattr(e, 'errno', None) is not None:
718
716
            mutter('Raising exception with errno %s', e.errno)
808
806
        """Return the stat information for a file."""
809
807
        path = self._remote_path(relpath)
810
808
        try:
811
 
            return self._get_sftp().lstat(path)
 
809
            return self._get_sftp().stat(path)
812
810
        except (IOError, paramiko.SSHException), e:
813
811
            self._translate_io_exception(e, path, ': unable to stat')
814
812
 
815
 
    def readlink(self, relpath):
816
 
        """See Transport.readlink."""
817
 
        path = self._remote_path(relpath)
818
 
        try:
819
 
            return self._get_sftp().readlink(path)
820
 
        except (IOError, paramiko.SSHException), e:
821
 
            self._translate_io_exception(e, path, ': unable to readlink')
822
 
 
823
 
    def symlink(self, source, link_name):
824
 
        """See Transport.symlink."""
825
 
        try:
826
 
            conn = self._get_sftp()
827
 
            sftp_retval = conn.symlink(source, link_name)
828
 
            if SFTP_OK != sftp_retval:
829
 
                raise TransportError(
830
 
                    '%r: unable to create symlink to %r' % (link_name, source),
831
 
                    sftp_retval
832
 
                )
833
 
        except (IOError, paramiko.SSHException), e:
834
 
            self._translate_io_exception(e, link_name,
835
 
                                         ': unable to create symlink to %r' % (source))
836
 
 
837
813
    def lock_read(self, relpath):
838
814
        """
839
815
        Lock the given file for shared (read) access.
902
878
        else:
903
879
            return True
904
880
 
 
881
# ------------- server test implementation --------------
 
882
import threading
 
883
 
 
884
from bzrlib.tests.stub_sftp import StubServer, StubSFTPServer
 
885
 
 
886
STUB_SERVER_KEY = """
 
887
-----BEGIN RSA PRIVATE KEY-----
 
888
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
 
889
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
 
890
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
 
891
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
 
892
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
 
893
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
 
894
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
 
895
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
 
896
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
 
897
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
 
898
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
 
899
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
 
900
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
 
901
-----END RSA PRIVATE KEY-----
 
902
"""
 
903
 
 
904
 
 
905
class SocketListener(threading.Thread):
 
906
 
 
907
    def __init__(self, callback):
 
908
        threading.Thread.__init__(self)
 
909
        self._callback = callback
 
910
        self._socket = socket.socket()
 
911
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
912
        self._socket.bind(('localhost', 0))
 
913
        self._socket.listen(1)
 
914
        self.port = self._socket.getsockname()[1]
 
915
        self._stop_event = threading.Event()
 
916
 
 
917
    def stop(self):
 
918
        # called from outside this thread
 
919
        self._stop_event.set()
 
920
        # use a timeout here, because if the test fails, the server thread may
 
921
        # never notice the stop_event.
 
922
        self.join(5.0)
 
923
        self._socket.close()
 
924
 
 
925
    def run(self):
 
926
        while True:
 
927
            readable, writable_unused, exception_unused = \
 
928
                select.select([self._socket], [], [], 0.1)
 
929
            if self._stop_event.isSet():
 
930
                return
 
931
            if len(readable) == 0:
 
932
                continue
 
933
            try:
 
934
                s, addr_unused = self._socket.accept()
 
935
                # because the loopback socket is inline, and transports are
 
936
                # never explicitly closed, best to launch a new thread.
 
937
                threading.Thread(target=self._callback, args=(s,)).start()
 
938
            except socket.error, x:
 
939
                sys.excepthook(*sys.exc_info())
 
940
                warning('Socket error during accept() within unit test server'
 
941
                        ' thread: %r' % x)
 
942
            except Exception, x:
 
943
                # probably a failed test; unit test thread will log the
 
944
                # failure/error
 
945
                sys.excepthook(*sys.exc_info())
 
946
                warning('Exception from within unit test server thread: %r' %
 
947
                        x)
 
948
 
 
949
 
 
950
class SocketDelay(object):
 
951
    """A socket decorator to make TCP appear slower.
 
952
 
 
953
    This changes recv, send, and sendall to add a fixed latency to each python
 
954
    call if a new roundtrip is detected. That is, when a recv is called and the
 
955
    flag new_roundtrip is set, latency is charged. Every send and send_all
 
956
    sets this flag.
 
957
 
 
958
    In addition every send, sendall and recv sleeps a bit per character send to
 
959
    simulate bandwidth.
 
960
 
 
961
    Not all methods are implemented, this is deliberate as this class is not a
 
962
    replacement for the builtin sockets layer. fileno is not implemented to
 
963
    prevent the proxy being bypassed.
 
964
    """
 
965
 
 
966
    simulated_time = 0
 
967
    _proxied_arguments = dict.fromkeys([
 
968
        "close", "getpeername", "getsockname", "getsockopt", "gettimeout",
 
969
        "setblocking", "setsockopt", "settimeout", "shutdown"])
 
970
 
 
971
    def __init__(self, sock, latency, bandwidth=1.0,
 
972
                 really_sleep=True):
 
973
        """
 
974
        :param bandwith: simulated bandwith (MegaBit)
 
975
        :param really_sleep: If set to false, the SocketDelay will just
 
976
        increase a counter, instead of calling time.sleep. This is useful for
 
977
        unittesting the SocketDelay.
 
978
        """
 
979
        self.sock = sock
 
980
        self.latency = latency
 
981
        self.really_sleep = really_sleep
 
982
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024)
 
983
        self.new_roundtrip = False
 
984
 
 
985
    def sleep(self, s):
 
986
        if self.really_sleep:
 
987
            time.sleep(s)
 
988
        else:
 
989
            SocketDelay.simulated_time += s
 
990
 
 
991
    def __getattr__(self, attr):
 
992
        if attr in SocketDelay._proxied_arguments:
 
993
            return getattr(self.sock, attr)
 
994
        raise AttributeError("'SocketDelay' object has no attribute %r" %
 
995
                             attr)
 
996
 
 
997
    def dup(self):
 
998
        return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
 
999
                           self._sleep)
 
1000
 
 
1001
    def recv(self, *args):
 
1002
        data = self.sock.recv(*args)
 
1003
        if data and self.new_roundtrip:
 
1004
            self.new_roundtrip = False
 
1005
            self.sleep(self.latency)
 
1006
        self.sleep(len(data) * self.time_per_byte)
 
1007
        return data
 
1008
 
 
1009
    def sendall(self, data, flags=0):
 
1010
        if not self.new_roundtrip:
 
1011
            self.new_roundtrip = True
 
1012
            self.sleep(self.latency)
 
1013
        self.sleep(len(data) * self.time_per_byte)
 
1014
        return self.sock.sendall(data, flags)
 
1015
 
 
1016
    def send(self, data, flags=0):
 
1017
        if not self.new_roundtrip:
 
1018
            self.new_roundtrip = True
 
1019
            self.sleep(self.latency)
 
1020
        bytes_sent = self.sock.send(data, flags)
 
1021
        self.sleep(bytes_sent * self.time_per_byte)
 
1022
        return bytes_sent
 
1023
 
 
1024
 
 
1025
class SFTPServer(Server):
 
1026
    """Common code for SFTP server facilities."""
 
1027
 
 
1028
    def __init__(self, server_interface=StubServer):
 
1029
        self._original_vendor = None
 
1030
        self._homedir = None
 
1031
        self._server_homedir = None
 
1032
        self._listener = None
 
1033
        self._root = None
 
1034
        self._vendor = ssh.ParamikoVendor()
 
1035
        self._server_interface = server_interface
 
1036
        # sftp server logs
 
1037
        self.logs = []
 
1038
        self.add_latency = 0
 
1039
 
 
1040
    def _get_sftp_url(self, path):
 
1041
        """Calculate an sftp url to this server for path."""
 
1042
        return 'sftp://foo:bar@localhost:%d/%s' % (self._listener.port, path)
 
1043
 
 
1044
    def log(self, message):
 
1045
        """StubServer uses this to log when a new server is created."""
 
1046
        self.logs.append(message)
 
1047
 
 
1048
    def _run_server_entry(self, sock):
 
1049
        """Entry point for all implementations of _run_server.
 
1050
 
 
1051
        If self.add_latency is > 0.000001 then sock is given a latency adding
 
1052
        decorator.
 
1053
        """
 
1054
        if self.add_latency > 0.000001:
 
1055
            sock = SocketDelay(sock, self.add_latency)
 
1056
        return self._run_server(sock)
 
1057
 
 
1058
    def _run_server(self, s):
 
1059
        ssh_server = paramiko.Transport(s)
 
1060
        key_file = pathjoin(self._homedir, 'test_rsa.key')
 
1061
        f = open(key_file, 'w')
 
1062
        f.write(STUB_SERVER_KEY)
 
1063
        f.close()
 
1064
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
1065
        ssh_server.add_server_key(host_key)
 
1066
        server = self._server_interface(self)
 
1067
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
 
1068
                                         StubSFTPServer, root=self._root,
 
1069
                                         home=self._server_homedir)
 
1070
        event = threading.Event()
 
1071
        ssh_server.start_server(event, server)
 
1072
        event.wait(5.0)
 
1073
 
 
1074
    def setUp(self, backing_server=None):
 
1075
        # XXX: TODO: make sftpserver back onto backing_server rather than local
 
1076
        # disk.
 
1077
        if not (backing_server is None or
 
1078
                isinstance(backing_server, local.LocalURLServer)):
 
1079
            raise AssertionError(
 
1080
                "backing_server should not be %r, because this can only serve the "
 
1081
                "local current working directory." % (backing_server,))
 
1082
        self._original_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
 
1083
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._vendor
 
1084
        if sys.platform == 'win32':
 
1085
            # Win32 needs to use the UNICODE api
 
1086
            self._homedir = getcwd()
 
1087
        else:
 
1088
            # But Linux SFTP servers should just deal in bytestreams
 
1089
            self._homedir = os.getcwd()
 
1090
        if self._server_homedir is None:
 
1091
            self._server_homedir = self._homedir
 
1092
        self._root = '/'
 
1093
        if sys.platform == 'win32':
 
1094
            self._root = ''
 
1095
        self._listener = SocketListener(self._run_server_entry)
 
1096
        self._listener.setDaemon(True)
 
1097
        self._listener.start()
 
1098
 
 
1099
    def tearDown(self):
 
1100
        """See bzrlib.transport.Server.tearDown."""
 
1101
        self._listener.stop()
 
1102
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._original_vendor
 
1103
 
 
1104
    def get_bogus_url(self):
 
1105
        """See bzrlib.transport.Server.get_bogus_url."""
 
1106
        # this is chosen to try to prevent trouble with proxies, wierd dns, etc
 
1107
        # we bind a random socket, so that we get a guaranteed unused port
 
1108
        # we just never listen on that port
 
1109
        s = socket.socket()
 
1110
        s.bind(('localhost', 0))
 
1111
        return 'sftp://%s:%s/' % s.getsockname()
 
1112
 
 
1113
 
 
1114
class SFTPFullAbsoluteServer(SFTPServer):
 
1115
    """A test server for sftp transports, using absolute urls and ssh."""
 
1116
 
 
1117
    def get_url(self):
 
1118
        """See bzrlib.transport.Server.get_url."""
 
1119
        homedir = self._homedir
 
1120
        if sys.platform != 'win32':
 
1121
            # Remove the initial '/' on all platforms but win32
 
1122
            homedir = homedir[1:]
 
1123
        return self._get_sftp_url(urlutils.escape(homedir))
 
1124
 
 
1125
 
 
1126
class SFTPServerWithoutSSH(SFTPServer):
 
1127
    """An SFTP server that uses a simple TCP socket pair rather than SSH."""
 
1128
 
 
1129
    def __init__(self):
 
1130
        super(SFTPServerWithoutSSH, self).__init__()
 
1131
        self._vendor = ssh.LoopbackVendor()
 
1132
 
 
1133
    def _run_server(self, sock):
 
1134
        # Re-import these as locals, so that they're still accessible during
 
1135
        # interpreter shutdown (when all module globals get set to None, leading
 
1136
        # to confusing errors like "'NoneType' object has no attribute 'error'".
 
1137
        class FakeChannel(object):
 
1138
            def get_transport(self):
 
1139
                return self
 
1140
            def get_log_channel(self):
 
1141
                return 'paramiko'
 
1142
            def get_name(self):
 
1143
                return '1'
 
1144
            def get_hexdump(self):
 
1145
                return False
 
1146
            def close(self):
 
1147
                pass
 
1148
 
 
1149
        server = paramiko.SFTPServer(
 
1150
            FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
 
1151
            root=self._root, home=self._server_homedir)
 
1152
        try:
 
1153
            server.start_subsystem(
 
1154
                'sftp', None, ssh.SocketAsChannelAdapter(sock))
 
1155
        except socket.error, e:
 
1156
            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
 
1157
                # it's okay for the client to disconnect abruptly
 
1158
                # (bug in paramiko 1.6: it should absorb this exception)
 
1159
                pass
 
1160
            else:
 
1161
                raise
 
1162
        except Exception, e:
 
1163
            # This typically seems to happen during interpreter shutdown, so
 
1164
            # most of the useful ways to report this error are won't work.
 
1165
            # Writing the exception type, and then the text of the exception,
 
1166
            # seems to be the best we can do.
 
1167
            import sys
 
1168
            sys.stderr.write('\nEXCEPTION %r: ' % (e.__class__,))
 
1169
            sys.stderr.write('%s\n\n' % (e,))
 
1170
        server.finish_subsystem()
 
1171
 
 
1172
 
 
1173
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
 
1174
    """A test server for sftp transports, using absolute urls."""
 
1175
 
 
1176
    def get_url(self):
 
1177
        """See bzrlib.transport.Server.get_url."""
 
1178
        homedir = self._homedir
 
1179
        if sys.platform != 'win32':
 
1180
            # Remove the initial '/' on all platforms but win32
 
1181
            homedir = homedir[1:]
 
1182
        return self._get_sftp_url(urlutils.escape(homedir))
 
1183
 
 
1184
 
 
1185
class SFTPHomeDirServer(SFTPServerWithoutSSH):
 
1186
    """A test server for sftp transports, using homedir relative urls."""
 
1187
 
 
1188
    def get_url(self):
 
1189
        """See bzrlib.transport.Server.get_url."""
 
1190
        return self._get_sftp_url("~/")
 
1191
 
 
1192
 
 
1193
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
 
1194
    """A test server for sftp transports where only absolute paths will work.
 
1195
 
 
1196
    It does this by serving from a deeply-nested directory that doesn't exist.
 
1197
    """
 
1198
 
 
1199
    def setUp(self, backing_server=None):
 
1200
        self._server_homedir = '/dev/noone/runs/tests/here'
 
1201
        super(SFTPSiblingAbsoluteServer, self).setUp(backing_server)
 
1202
 
905
1203
 
906
1204
def get_test_permutations():
907
1205
    """Return the permutations to be used in testing."""
908
 
    from bzrlib.tests import stub_sftp
909
 
    return [(SFTPTransport, stub_sftp.SFTPAbsoluteServer),
910
 
            (SFTPTransport, stub_sftp.SFTPHomeDirServer),
911
 
            (SFTPTransport, stub_sftp.SFTPSiblingAbsoluteServer),
 
1206
    return [(SFTPTransport, SFTPAbsoluteServer),
 
1207
            (SFTPTransport, SFTPHomeDirServer),
 
1208
            (SFTPTransport, SFTPSiblingAbsoluteServer),
912
1209
            ]