~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/ftp_server/pyftpdlib_based.py

  • Committer: Vincent Ladeuil
  • Date: 2009-03-19 14:57:51 UTC
  • mfrom: (3508.1.24 pyftpdlib)
  • mto: This revision was merged to the branch mainline in revision 4168.
  • Revision ID: v.ladeuil+lp@free.fr-20090319145751-6vg1h5oo1jox3wsw
FTP test server: disable medusa for python2.6, add pyftpdlib

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
"""
 
17
FTP test server.
 
18
 
 
19
Based on pyftpdlib: http://code.google.com/p/pyftpdlib/
 
20
"""
 
21
 
 
22
import errno
 
23
import os
 
24
from pyftpdlib import ftpserver
 
25
import select
 
26
import threading
 
27
 
 
28
 
 
29
from bzrlib import (
 
30
    osutils,
 
31
    trace,
 
32
    transport,
 
33
    )
 
34
 
 
35
 
 
36
class AnonymousWithWriteAccessAuthorizer(ftpserver.DummyAuthorizer):
 
37
 
 
38
    def _check_permissions(self, username, perm):
 
39
        # Like base implementation but don't warn about write permissions
 
40
        # assigned to anonymous, since that's exactly our purpose.
 
41
        for p in perm:
 
42
            if p not in self.read_perms + self.write_perms:
 
43
                raise ftpserver.AuthorizerError('No such permission "%s"' %p)
 
44
 
 
45
 
 
46
class BzrConformingFS(ftpserver.AbstractedFS):
 
47
 
 
48
    def chmod(self, path, mode):
 
49
        return os.chmod(path, mode)
 
50
 
 
51
    def listdir(self, path):
 
52
        """List the content of a directory."""
 
53
        # FIXME: need tests with unicode paths
 
54
        return [osutils.safe_utf8(s) for s in os.listdir(path)]
 
55
 
 
56
    def fs2ftp(self, fspath):
 
57
        p = ftpserver.AbstractedFS.fs2ftp(self, fspath)
 
58
        # FIXME: need tests with unicode paths
 
59
        return osutils.safe_utf8(p)
 
60
 
 
61
 
 
62
class BzrConformingFTPHandler(ftpserver.FTPHandler):
 
63
 
 
64
    abstracted_fs = BzrConformingFS
 
65
 
 
66
    def __init__(self, conn, server):
 
67
        ftpserver.FTPHandler.__init__(self, conn, server)
 
68
        self.authorizer = server.authorizer
 
69
 
 
70
    def ftp_SIZE(self, path):
 
71
        # bzr is overly picky here, but we want to make the test suite pass
 
72
        # first. This may need to be revisited -- vila 20090226
 
73
        line = self.fs.fs2ftp(path)
 
74
        if self.fs.isdir(self.fs.realpath(path)):
 
75
            why = "%s is a directory" % line
 
76
            self.log('FAIL SIZE "%s". %s.' % (line, why))
 
77
            self.respond("550 %s."  %why)
 
78
        else:
 
79
            ftpserver.FTPHandler.ftp_SIZE(self, path)
 
80
 
 
81
    def ftp_NLST(self, path):
 
82
        # bzr is overly picky here, but we want to make the test suite pass
 
83
        # first. This may need to be revisited -- vila 20090226
 
84
        line = self.fs.fs2ftp(path)
 
85
        if self.fs.isfile(self.fs.realpath(path)):
 
86
            why = "Not a directory: %s" % line
 
87
            self.log('FAIL SIZE "%s". %s.' % (line, why))
 
88
            self.respond("550 %s."  %why)
 
89
        else:
 
90
            ftpserver.FTPHandler.ftp_NLST(self, path)
 
91
 
 
92
    def ftp_SITE_CHMOD(self, line):
 
93
        try:
 
94
            mode, path = line.split(None, 1)
 
95
            mode = int(mode, 8)
 
96
        except ValueError:
 
97
            # We catch both malformed line and malformed mode with the same
 
98
            # ValueError.
 
99
            self.respond("500 'SITE CHMOD %s': command not understood."
 
100
                         % line)
 
101
            self.log('FAIL SITE CHMOD ' % line)
 
102
            return
 
103
        ftp_path = self.fs.fs2ftp(path)
 
104
        try:
 
105
            self.run_as_current_user(self.fs.chmod, self.fs.ftp2fs(path), mode)
 
106
        except OSError, err:
 
107
            why = ftpserver._strerror(err)
 
108
            self.log('FAIL SITE CHMOD 0%03o "%s". %s.' % (mode, ftp_path, why))
 
109
            self.respond('550 %s.' % why)
 
110
        else:
 
111
            self.log('OK SITE CHMOD 0%03o "%s".' % (mode, ftp_path))
 
112
            self.respond('200 SITE CHMOD succesful.')
 
113
 
 
114
 
 
115
# pyftpdlib says to define SITE commands by declaring ftp_SITE_<CMD> methods,
 
116
# but fails to recognize them.
 
117
ftpserver.proto_cmds['SITE CHMOD'] = ftpserver._CommandProperty(
 
118
    perm='w', # Best fit choice even if not exactly right (can be d, f or m too)
 
119
    auth_needed=True, arg_needed=True, check_path=False,
 
120
    help='Syntax: SITE CHMOD <SP>  octal_mode_bits file-name (chmod file)',
 
121
    )
 
122
# An empty password is valid, hence the arg is neither mandatory not forbidden
 
123
ftpserver.proto_cmds['PASS'].arg_needed = None
 
124
 
 
125
 
 
126
class ftp_server(ftpserver.FTPServer):
 
127
 
 
128
    def __init__(self, address, handler, authorizer):
 
129
        ftpserver.FTPServer.__init__(self, address, handler)
 
130
        self.authorizer = authorizer
 
131
        # Worth backporting upstream ?
 
132
        self.addr = self.socket.getsockname()
 
133
 
 
134
 
 
135
class FTPTestServer(transport.Server):
 
136
    """Common code for FTP server facilities."""
 
137
 
 
138
    def __init__(self):
 
139
        self._root = None
 
140
        self._ftp_server = None
 
141
        self._port = None
 
142
        self._async_thread = None
 
143
        # ftp server logs
 
144
        self.logs = []
 
145
        self._ftpd_running = False
 
146
 
 
147
    def get_url(self):
 
148
        """Calculate an ftp url to this server."""
 
149
        return 'ftp://anonymous@localhost:%d/' % (self._port)
 
150
 
 
151
    def get_bogus_url(self):
 
152
        """Return a URL which cannot be connected to."""
 
153
        return 'ftp://127.0.0.1:1/'
 
154
 
 
155
    def log(self, message):
 
156
        """This is used by ftp_server to log connections, etc."""
 
157
        self.logs.append(message)
 
158
 
 
159
    def setUp(self, vfs_server=None):
 
160
        from bzrlib.transport.local import LocalURLServer
 
161
        if not (vfs_server is None or isinstance(vfs_server, LocalURLServer)):
 
162
            raise AssertionError(
 
163
                "FTPServer currently assumes local transport, got %s"
 
164
                % vfs_server)
 
165
        self._root = os.getcwdu()
 
166
 
 
167
        address = ('localhost', 0) # bind to a random port
 
168
        authorizer = AnonymousWithWriteAccessAuthorizer()
 
169
        authorizer.add_anonymous(self._root, perm='elradfmw')
 
170
        self._ftp_server = ftp_server(address, BzrConformingFTPHandler,
 
171
                                      authorizer)
 
172
        # This is hacky as hell, will not work if we need two servers working
 
173
        # at the same time, but that's the best we can do so far...
 
174
        # FIXME: At least log and logline could be overriden in the handler ?
 
175
        # -- vila 20090227
 
176
        ftpserver.log = self.log
 
177
        ftpserver.logline = self.log
 
178
        ftpserver.logerror = self.log
 
179
 
 
180
        self._port = self._ftp_server.socket.getsockname()[1]
 
181
        self._ftpd_starting = threading.Lock()
 
182
        self._ftpd_starting.acquire() # So it can be released by the server
 
183
        self._ftpd_thread = threading.Thread(
 
184
                target=self._run_server,)
 
185
        self._ftpd_thread.start()
 
186
        # Wait for the server thread to start (i.e release the lock)
 
187
        self._ftpd_starting.acquire()
 
188
        self._ftpd_starting.release()
 
189
 
 
190
    def tearDown(self):
 
191
        """See bzrlib.transport.Server.tearDown."""
 
192
        # Tell the server to stop, but also close the server socket for tests
 
193
        # that start the server but never initiate a connection. Closing the
 
194
        # socket should be done first though, to avoid further connections.
 
195
        self._ftp_server.close()
 
196
        self._ftpd_running = False
 
197
        self._ftpd_thread.join()
 
198
 
 
199
    def _run_server(self):
 
200
        """Run the server until tearDown is called, shut it down properly then.
 
201
        """
 
202
        self._ftpd_running = True
 
203
        self._ftpd_starting.release()
 
204
        while self._ftpd_running:
 
205
            self._ftp_server.serve_forever(timeout=0.1, count=1)
 
206
        self._ftp_server.close_all(ignore_all=True)
 
207
 
 
208
    def add_user(self, user, password):
 
209
        """Add a user with write access."""
 
210
        self._ftp_server.authorizer.add_user(user, password, self._root,
 
211
                                             perm='elradfmw')