~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

(vila) Fix test failures blocking package builds. (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Implementation of Transport over SFTP, using paramiko."""
18
18
 
 
19
from __future__ import absolute_import
 
20
 
19
21
# TODO: Remove the transport-based lock_read and lock_write methods.  They'll
20
22
# then raise TransportNotPossible, which will break remote access to any
21
23
# formats which rely on OS-level locks.  That should be fine as those formats
28
30
import itertools
29
31
import os
30
32
import random
31
 
import select
32
 
import socket
33
33
import stat
34
34
import sys
35
35
import time
36
 
import urllib
37
 
import urlparse
38
36
import warnings
39
37
 
40
38
from bzrlib import (
44
42
    urlutils,
45
43
    )
46
44
from bzrlib.errors import (FileExists,
47
 
                           NoSuchFile, PathNotChild,
 
45
                           NoSuchFile,
48
46
                           TransportError,
49
47
                           LockError,
50
48
                           PathError,
51
49
                           ParamikoNotPresent,
52
50
                           )
53
 
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
54
 
from bzrlib.symbol_versioning import (
55
 
        deprecated_function,
56
 
        )
 
51
from bzrlib.osutils import fancy_rename
57
52
from bzrlib.trace import mutter, warning
58
53
from bzrlib.transport import (
59
54
    FileFileStream,
60
55
    _file_streams,
61
 
    local,
62
 
    Server,
63
56
    ssh,
64
57
    ConnectedTransport,
65
58
    )
84
77
else:
85
78
    from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
86
79
                               SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
87
 
                               CMD_HANDLE, CMD_OPEN)
 
80
                               SFTP_OK, CMD_HANDLE, CMD_OPEN)
88
81
    from paramiko.sftp_attr import SFTPAttributes
89
82
    from paramiko.sftp_file import SFTPFile
90
83
 
116
109
        except FileExists:
117
110
            raise LockError('File %r already locked' % (self.path,))
118
111
 
119
 
    def __del__(self):
120
 
        """Should this warn, or actually try to cleanup?"""
121
 
        if self.lock_file:
122
 
            warning("SFTPLock %r not explicitly unlocked" % (self.path,))
123
 
            self.unlock()
124
 
 
125
112
    def unlock(self):
126
113
        if not self.lock_file:
127
114
            return
283
270
                    buffered = buffered[buffered_offset:]
284
271
                    buffered_data = [buffered]
285
272
                    buffered_len = len(buffered)
 
273
        # now that the data stream is done, close the handle
 
274
        fp.close()
286
275
        if buffered_len:
287
276
            buffered = ''.join(buffered_data)
288
277
            del buffered_data[:]
343
332
    # up the request itself, rather than us having to worry about it
344
333
    _max_request_size = 32768
345
334
 
346
 
    def __init__(self, base, _from_transport=None):
347
 
        super(SFTPTransport, self).__init__(base,
348
 
                                            _from_transport=_from_transport)
349
 
 
350
335
    def _remote_path(self, relpath):
351
336
        """Return the path to be passed along the sftp protocol for relpath.
352
337
 
353
338
        :param relpath: is a urlencoded string.
354
339
        """
355
 
        relative = urlutils.unescape(relpath).encode('utf-8')
356
 
        remote_path = self._combine_paths(self._path, relative)
 
340
        remote_path = self._parsed_url.clone(relpath).path
357
341
        # the initial slash should be removed from the path, and treated as a
358
342
        # homedir relative path (the path begins with a double slash if it is
359
343
        # absolute).  see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
378
362
        in base url at transport creation time.
379
363
        """
380
364
        if credentials is None:
381
 
            password = self._password
 
365
            password = self._parsed_url.password
382
366
        else:
383
367
            password = credentials
384
368
 
385
369
        vendor = ssh._get_ssh_vendor()
386
 
        user = self._user
 
370
        user = self._parsed_url.user
387
371
        if user is None:
388
372
            auth = config.AuthenticationConfig()
389
 
            user = auth.get_user('ssh', self._host, self._port)
390
 
        connection = vendor.connect_sftp(self._user, password,
391
 
                                         self._host, self._port)
 
373
            user = auth.get_user('ssh', self._parsed_url.host,
 
374
                self._parsed_url.port)
 
375
        connection = vendor.connect_sftp(self._parsed_url.user, password,
 
376
            self._parsed_url.host, self._parsed_url.port)
392
377
        return connection, (user, password)
393
378
 
 
379
    def disconnect(self):
 
380
        connection = self._get_connection()
 
381
        if connection is not None:
 
382
            connection.close()
 
383
 
394
384
    def _get_sftp(self):
395
385
        """Ensures that a connection is established"""
396
386
        connection = self._get_connection()
418
408
        :param relpath: The relative path to the file
419
409
        """
420
410
        try:
421
 
            # FIXME: by returning the file directly, we don't pass this
422
 
            # through to report_activity.  We could try wrapping the object
423
 
            # before it's returned.  For readv and get_bytes it's handled in
424
 
            # the higher-level function.
425
 
            # -- mbp 20090126
426
411
            path = self._remote_path(relpath)
427
412
            f = self._get_sftp().file(path, mode='rb')
428
413
            if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
717
702
            if (e.args[0].startswith('Directory not empty: ')
718
703
                or getattr(e, 'errno', None) == errno.ENOTEMPTY):
719
704
                raise errors.DirectoryNotEmpty(path, str(e))
 
705
            if e.args == ('Operation unsupported',):
 
706
                raise errors.TransportNotPossible()
720
707
            mutter('Raising exception with args %s', e.args)
721
708
        if getattr(e, 'errno', None) is not None:
722
709
            mutter('Raising exception with errno %s', e.errno)
812
799
        """Return the stat information for a file."""
813
800
        path = self._remote_path(relpath)
814
801
        try:
815
 
            return self._get_sftp().stat(path)
 
802
            return self._get_sftp().lstat(path)
816
803
        except (IOError, paramiko.SSHException), e:
817
804
            self._translate_io_exception(e, path, ': unable to stat')
818
805
 
 
806
    def readlink(self, relpath):
 
807
        """See Transport.readlink."""
 
808
        path = self._remote_path(relpath)
 
809
        try:
 
810
            return self._get_sftp().readlink(path)
 
811
        except (IOError, paramiko.SSHException), e:
 
812
            self._translate_io_exception(e, path, ': unable to readlink')
 
813
 
 
814
    def symlink(self, source, link_name):
 
815
        """See Transport.symlink."""
 
816
        try:
 
817
            conn = self._get_sftp()
 
818
            sftp_retval = conn.symlink(source, link_name)
 
819
            if SFTP_OK != sftp_retval:
 
820
                raise TransportError(
 
821
                    '%r: unable to create symlink to %r' % (link_name, source),
 
822
                    sftp_retval
 
823
                )
 
824
        except (IOError, paramiko.SSHException), e:
 
825
            self._translate_io_exception(e, link_name,
 
826
                                         ': unable to create symlink to %r' % (source))
 
827
 
819
828
    def lock_read(self, relpath):
820
829
        """
821
830
        Lock the given file for shared (read) access.
884
893
        else:
885
894
            return True
886
895
 
887
 
# ------------- server test implementation --------------
888
 
import threading
889
 
 
890
 
from bzrlib.tests.stub_sftp import StubServer, StubSFTPServer
891
 
 
892
 
STUB_SERVER_KEY = """
893
 
-----BEGIN RSA PRIVATE KEY-----
894
 
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
895
 
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
896
 
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
897
 
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
898
 
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
899
 
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
900
 
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
901
 
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
902
 
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
903
 
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
904
 
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
905
 
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
906
 
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
907
 
-----END RSA PRIVATE KEY-----
908
 
"""
909
 
 
910
 
 
911
 
class SocketListener(threading.Thread):
912
 
 
913
 
    def __init__(self, callback):
914
 
        threading.Thread.__init__(self)
915
 
        self._callback = callback
916
 
        self._socket = socket.socket()
917
 
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
918
 
        self._socket.bind(('localhost', 0))
919
 
        self._socket.listen(1)
920
 
        self.port = self._socket.getsockname()[1]
921
 
        self._stop_event = threading.Event()
922
 
 
923
 
    def stop(self):
924
 
        # called from outside this thread
925
 
        self._stop_event.set()
926
 
        # use a timeout here, because if the test fails, the server thread may
927
 
        # never notice the stop_event.
928
 
        self.join(5.0)
929
 
        self._socket.close()
930
 
 
931
 
    def run(self):
932
 
        while True:
933
 
            readable, writable_unused, exception_unused = \
934
 
                select.select([self._socket], [], [], 0.1)
935
 
            if self._stop_event.isSet():
936
 
                return
937
 
            if len(readable) == 0:
938
 
                continue
939
 
            try:
940
 
                s, addr_unused = self._socket.accept()
941
 
                # because the loopback socket is inline, and transports are
942
 
                # never explicitly closed, best to launch a new thread.
943
 
                threading.Thread(target=self._callback, args=(s,)).start()
944
 
            except socket.error, x:
945
 
                sys.excepthook(*sys.exc_info())
946
 
                warning('Socket error during accept() within unit test server'
947
 
                        ' thread: %r' % x)
948
 
            except Exception, x:
949
 
                # probably a failed test; unit test thread will log the
950
 
                # failure/error
951
 
                sys.excepthook(*sys.exc_info())
952
 
                warning('Exception from within unit test server thread: %r' %
953
 
                        x)
954
 
 
955
 
 
956
 
class SocketDelay(object):
957
 
    """A socket decorator to make TCP appear slower.
958
 
 
959
 
    This changes recv, send, and sendall to add a fixed latency to each python
960
 
    call if a new roundtrip is detected. That is, when a recv is called and the
961
 
    flag new_roundtrip is set, latency is charged. Every send and send_all
962
 
    sets this flag.
963
 
 
964
 
    In addition every send, sendall and recv sleeps a bit per character send to
965
 
    simulate bandwidth.
966
 
 
967
 
    Not all methods are implemented, this is deliberate as this class is not a
968
 
    replacement for the builtin sockets layer. fileno is not implemented to
969
 
    prevent the proxy being bypassed.
970
 
    """
971
 
 
972
 
    simulated_time = 0
973
 
    _proxied_arguments = dict.fromkeys([
974
 
        "close", "getpeername", "getsockname", "getsockopt", "gettimeout",
975
 
        "setblocking", "setsockopt", "settimeout", "shutdown"])
976
 
 
977
 
    def __init__(self, sock, latency, bandwidth=1.0,
978
 
                 really_sleep=True):
979
 
        """
980
 
        :param bandwith: simulated bandwith (MegaBit)
981
 
        :param really_sleep: If set to false, the SocketDelay will just
982
 
        increase a counter, instead of calling time.sleep. This is useful for
983
 
        unittesting the SocketDelay.
984
 
        """
985
 
        self.sock = sock
986
 
        self.latency = latency
987
 
        self.really_sleep = really_sleep
988
 
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024)
989
 
        self.new_roundtrip = False
990
 
 
991
 
    def sleep(self, s):
992
 
        if self.really_sleep:
993
 
            time.sleep(s)
994
 
        else:
995
 
            SocketDelay.simulated_time += s
996
 
 
997
 
    def __getattr__(self, attr):
998
 
        if attr in SocketDelay._proxied_arguments:
999
 
            return getattr(self.sock, attr)
1000
 
        raise AttributeError("'SocketDelay' object has no attribute %r" %
1001
 
                             attr)
1002
 
 
1003
 
    def dup(self):
1004
 
        return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
1005
 
                           self._sleep)
1006
 
 
1007
 
    def recv(self, *args):
1008
 
        data = self.sock.recv(*args)
1009
 
        if data and self.new_roundtrip:
1010
 
            self.new_roundtrip = False
1011
 
            self.sleep(self.latency)
1012
 
        self.sleep(len(data) * self.time_per_byte)
1013
 
        return data
1014
 
 
1015
 
    def sendall(self, data, flags=0):
1016
 
        if not self.new_roundtrip:
1017
 
            self.new_roundtrip = True
1018
 
            self.sleep(self.latency)
1019
 
        self.sleep(len(data) * self.time_per_byte)
1020
 
        return self.sock.sendall(data, flags)
1021
 
 
1022
 
    def send(self, data, flags=0):
1023
 
        if not self.new_roundtrip:
1024
 
            self.new_roundtrip = True
1025
 
            self.sleep(self.latency)
1026
 
        bytes_sent = self.sock.send(data, flags)
1027
 
        self.sleep(bytes_sent * self.time_per_byte)
1028
 
        return bytes_sent
1029
 
 
1030
 
 
1031
 
class SFTPServer(Server):
1032
 
    """Common code for SFTP server facilities."""
1033
 
 
1034
 
    def __init__(self, server_interface=StubServer):
1035
 
        self._original_vendor = None
1036
 
        self._homedir = None
1037
 
        self._server_homedir = None
1038
 
        self._listener = None
1039
 
        self._root = None
1040
 
        self._vendor = ssh.ParamikoVendor()
1041
 
        self._server_interface = server_interface
1042
 
        # sftp server logs
1043
 
        self.logs = []
1044
 
        self.add_latency = 0
1045
 
 
1046
 
    def _get_sftp_url(self, path):
1047
 
        """Calculate an sftp url to this server for path."""
1048
 
        return 'sftp://foo:bar@localhost:%d/%s' % (self._listener.port, path)
1049
 
 
1050
 
    def log(self, message):
1051
 
        """StubServer uses this to log when a new server is created."""
1052
 
        self.logs.append(message)
1053
 
 
1054
 
    def _run_server_entry(self, sock):
1055
 
        """Entry point for all implementations of _run_server.
1056
 
 
1057
 
        If self.add_latency is > 0.000001 then sock is given a latency adding
1058
 
        decorator.
1059
 
        """
1060
 
        if self.add_latency > 0.000001:
1061
 
            sock = SocketDelay(sock, self.add_latency)
1062
 
        return self._run_server(sock)
1063
 
 
1064
 
    def _run_server(self, s):
1065
 
        ssh_server = paramiko.Transport(s)
1066
 
        key_file = pathjoin(self._homedir, 'test_rsa.key')
1067
 
        f = open(key_file, 'w')
1068
 
        f.write(STUB_SERVER_KEY)
1069
 
        f.close()
1070
 
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
1071
 
        ssh_server.add_server_key(host_key)
1072
 
        server = self._server_interface(self)
1073
 
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
1074
 
                                         StubSFTPServer, root=self._root,
1075
 
                                         home=self._server_homedir)
1076
 
        event = threading.Event()
1077
 
        ssh_server.start_server(event, server)
1078
 
        event.wait(5.0)
1079
 
 
1080
 
    def setUp(self, backing_server=None):
1081
 
        # XXX: TODO: make sftpserver back onto backing_server rather than local
1082
 
        # disk.
1083
 
        if not (backing_server is None or
1084
 
                isinstance(backing_server, local.LocalURLServer)):
1085
 
            raise AssertionError(
1086
 
                "backing_server should not be %r, because this can only serve the "
1087
 
                "local current working directory." % (backing_server,))
1088
 
        self._original_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
1089
 
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._vendor
1090
 
        if sys.platform == 'win32':
1091
 
            # Win32 needs to use the UNICODE api
1092
 
            self._homedir = getcwd()
1093
 
        else:
1094
 
            # But Linux SFTP servers should just deal in bytestreams
1095
 
            self._homedir = os.getcwd()
1096
 
        if self._server_homedir is None:
1097
 
            self._server_homedir = self._homedir
1098
 
        self._root = '/'
1099
 
        if sys.platform == 'win32':
1100
 
            self._root = ''
1101
 
        self._listener = SocketListener(self._run_server_entry)
1102
 
        self._listener.setDaemon(True)
1103
 
        self._listener.start()
1104
 
 
1105
 
    def tearDown(self):
1106
 
        """See bzrlib.transport.Server.tearDown."""
1107
 
        self._listener.stop()
1108
 
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._original_vendor
1109
 
 
1110
 
    def get_bogus_url(self):
1111
 
        """See bzrlib.transport.Server.get_bogus_url."""
1112
 
        # this is chosen to try to prevent trouble with proxies, wierd dns, etc
1113
 
        # we bind a random socket, so that we get a guaranteed unused port
1114
 
        # we just never listen on that port
1115
 
        s = socket.socket()
1116
 
        s.bind(('localhost', 0))
1117
 
        return 'sftp://%s:%s/' % s.getsockname()
1118
 
 
1119
 
 
1120
 
class SFTPFullAbsoluteServer(SFTPServer):
1121
 
    """A test server for sftp transports, using absolute urls and ssh."""
1122
 
 
1123
 
    def get_url(self):
1124
 
        """See bzrlib.transport.Server.get_url."""
1125
 
        homedir = self._homedir
1126
 
        if sys.platform != 'win32':
1127
 
            # Remove the initial '/' on all platforms but win32
1128
 
            homedir = homedir[1:]
1129
 
        return self._get_sftp_url(urlutils.escape(homedir))
1130
 
 
1131
 
 
1132
 
class SFTPServerWithoutSSH(SFTPServer):
1133
 
    """An SFTP server that uses a simple TCP socket pair rather than SSH."""
1134
 
 
1135
 
    def __init__(self):
1136
 
        super(SFTPServerWithoutSSH, self).__init__()
1137
 
        self._vendor = ssh.LoopbackVendor()
1138
 
 
1139
 
    def _run_server(self, sock):
1140
 
        # Re-import these as locals, so that they're still accessible during
1141
 
        # interpreter shutdown (when all module globals get set to None, leading
1142
 
        # to confusing errors like "'NoneType' object has no attribute 'error'".
1143
 
        class FakeChannel(object):
1144
 
            def get_transport(self):
1145
 
                return self
1146
 
            def get_log_channel(self):
1147
 
                return 'paramiko'
1148
 
            def get_name(self):
1149
 
                return '1'
1150
 
            def get_hexdump(self):
1151
 
                return False
1152
 
            def close(self):
1153
 
                pass
1154
 
 
1155
 
        server = paramiko.SFTPServer(
1156
 
            FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
1157
 
            root=self._root, home=self._server_homedir)
1158
 
        try:
1159
 
            server.start_subsystem(
1160
 
                'sftp', None, ssh.SocketAsChannelAdapter(sock))
1161
 
        except socket.error, e:
1162
 
            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
1163
 
                # it's okay for the client to disconnect abruptly
1164
 
                # (bug in paramiko 1.6: it should absorb this exception)
1165
 
                pass
1166
 
            else:
1167
 
                raise
1168
 
        except Exception, e:
1169
 
            # This typically seems to happen during interpreter shutdown, so
1170
 
            # most of the useful ways to report this error are won't work.
1171
 
            # Writing the exception type, and then the text of the exception,
1172
 
            # seems to be the best we can do.
1173
 
            import sys
1174
 
            sys.stderr.write('\nEXCEPTION %r: ' % (e.__class__,))
1175
 
            sys.stderr.write('%s\n\n' % (e,))
1176
 
        server.finish_subsystem()
1177
 
 
1178
 
 
1179
 
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
1180
 
    """A test server for sftp transports, using absolute urls."""
1181
 
 
1182
 
    def get_url(self):
1183
 
        """See bzrlib.transport.Server.get_url."""
1184
 
        homedir = self._homedir
1185
 
        if sys.platform != 'win32':
1186
 
            # Remove the initial '/' on all platforms but win32
1187
 
            homedir = homedir[1:]
1188
 
        return self._get_sftp_url(urlutils.escape(homedir))
1189
 
 
1190
 
 
1191
 
class SFTPHomeDirServer(SFTPServerWithoutSSH):
1192
 
    """A test server for sftp transports, using homedir relative urls."""
1193
 
 
1194
 
    def get_url(self):
1195
 
        """See bzrlib.transport.Server.get_url."""
1196
 
        return self._get_sftp_url("~/")
1197
 
 
1198
 
 
1199
 
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
1200
 
    """A test server for sftp transports where only absolute paths will work.
1201
 
 
1202
 
    It does this by serving from a deeply-nested directory that doesn't exist.
1203
 
    """
1204
 
 
1205
 
    def setUp(self, backing_server=None):
1206
 
        self._server_homedir = '/dev/noone/runs/tests/here'
1207
 
        super(SFTPSiblingAbsoluteServer, self).setUp(backing_server)
1208
 
 
1209
896
 
1210
897
def get_test_permutations():
1211
898
    """Return the permutations to be used in testing."""
1212
 
    return [(SFTPTransport, SFTPAbsoluteServer),
1213
 
            (SFTPTransport, SFTPHomeDirServer),
1214
 
            (SFTPTransport, SFTPSiblingAbsoluteServer),
 
899
    from bzrlib.tests import stub_sftp
 
900
    return [(SFTPTransport, stub_sftp.SFTPAbsoluteServer),
 
901
            (SFTPTransport, stub_sftp.SFTPHomeDirServer),
 
902
            (SFTPTransport, stub_sftp.SFTPSiblingAbsoluteServer),
1215
903
            ]