~bzr-pqm/bzr/bzr.dev

1185.16.127 by Martin Pool
[patch] paramiko sftp tests (robey)
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
2
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
7
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
# GNU General Public License for more details.
12
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
17
"""
18
A stub SFTP server for loopback SFTP testing.
19
Adapted from the one in paramiko's unit tests.
20
"""
21
22
import os
23
from paramiko import ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, \
24
    SFTPHandle, SFTP_OK, AUTH_SUCCESSFUL, OPEN_SUCCEEDED
25
26
27
class StubServer (ServerInterface):
28
    def check_auth_password(self, username, password):
29
        # all are allowed
30
        return AUTH_SUCCESSFUL
31
32
    def check_channel_request(self, kind, chanid):
33
        return OPEN_SUCCEEDED
34
35
36
class StubSFTPHandle (SFTPHandle):
37
    def stat(self):
38
        try:
39
            return SFTPAttributes.from_stat(os.fstat(self.readfile.fileno()))
40
        except OSError, e:
41
            return SFTPServer.convert_errno(e.errno)
42
43
    def chattr(self, attr):
44
        # python doesn't have equivalents to fchown or fchmod, so we have to
45
        # use the stored filename
46
        try:
47
            SFTPServer.set_file_attr(self.filename, attr)
48
        except OSError, e:
49
            return SFTPServer.convert_errno(e.errno)
50
51
52
class StubSFTPServer (SFTPServerInterface):
53
    def __init__(self, server, root):
54
        SFTPServerInterface.__init__(self, server)
55
        self.root = root
56
        
57
    def _realpath(self, path):
58
        return self.root + self.canonicalize(path)
59
60
    def list_folder(self, path):
61
        path = self._realpath(path)
62
        try:
63
            out = [ ]
64
            flist = os.listdir(path)
65
            for fname in flist:
66
                attr = SFTPAttributes.from_stat(os.stat(os.path.join(path, fname)))
67
                attr.filename = fname
68
                out.append(attr)
69
            return out
70
        except OSError, e:
71
            return SFTPServer.convert_errno(e.errno)
72
73
    def stat(self, path):
74
        path = self._realpath(path)
75
        try:
76
            return SFTPAttributes.from_stat(os.stat(path))
77
        except OSError, e:
78
            return SFTPServer.convert_errno(e.errno)
79
80
    def lstat(self, path):
81
        path = self._realpath(path)
82
        try:
83
            return SFTPAttributes.from_stat(os.lstat(path))
84
        except OSError, e:
85
            return SFTPServer.convert_errno(e.errno)
86
87
    def open(self, path, flags, attr):
88
        path = self._realpath(path)
89
        try:
90
            fd = os.open(path, flags)
91
        except OSError, e:
92
            return SFTPServer.convert_errno(e.errno)
93
        if (flags & os.O_CREAT) and (attr is not None):
94
            SFTPServer.set_file_attr(path, attr)
95
        if flags & os.O_WRONLY:
96
            fstr = 'w'
97
        elif flags & os.O_RDWR:
98
            fstr = 'r+'
99
        else:
100
            # O_RDONLY (== 0)
101
            fstr = 'r'
102
        try:
103
            f = os.fdopen(fd, fstr)
104
        except OSError, e:
105
            return SFTPServer.convert_errno(e.errno)
106
        fobj = StubSFTPHandle()
107
        fobj.filename = path
108
        fobj.readfile = f
109
        fobj.writefile = f
110
        return fobj
111
112
    def remove(self, path):
113
        path = self._realpath(path)
114
        try:
115
            os.remove(path)
116
        except OSError, e:
117
            return SFTPServer.convert_errno(e.errno)
118
        return SFTP_OK
119
120
    def rename(self, oldpath, newpath):
121
        oldpath = self._realpath(oldpath)
122
        newpath = self._realpath(newpath)
123
        try:
124
            os.rename(oldpath, newpath)
125
        except OSError, e:
126
            return SFTPServer.convert_errno(e.errno)
127
        return SFTP_OK
128
129
    def mkdir(self, path, attr):
130
        path = self._realpath(path)
131
        try:
132
            os.mkdir(path)
133
            if attr is not None:
134
                SFTPServer.set_file_attr(path, attr)
135
        except OSError, e:
136
            return SFTPServer.convert_errno(e.errno)
137
        return SFTP_OK
138
139
    def rmdir(self, path):
140
        path = self._realpath(path)
141
        try:
142
            os.rmdir(path)
143
        except OSError, e:
144
            return SFTPServer.convert_errno(e.errno)
145
        return SFTP_OK
146
147
    # removed: chattr, symlink, readlink
148
    # (nothing in bzr's sftp transport uses those)