~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/ftp_server.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2008-05-01 11:25:12 UTC
  • mfrom: (3211.7.10 protocol-v3-doc)
  • Revision ID: pqm@pqm.ubuntu.com-20080501112512-b9lgs4w8r43evtn1
Add the smart protocol v3 specification to network-protocol.txt

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2007-2010 Canonical Ltd
 
1
# Copyright (C) 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
"""
17
17
FTP test server.
18
18
 
33
33
from bzrlib import (
34
34
    tests,
35
35
    trace,
 
36
    transport,
36
37
    )
37
 
from bzrlib.tests import test_server
38
 
 
39
 
 
40
 
class test_filesystem(medusa.filesys.os_filesystem):
41
 
    """A custom filesystem wrapper to add missing functionalities."""
42
 
 
43
 
    def chmod(self, path, mode):
44
 
        p = self.normalize(self.path_module.join (self.wd, path))
45
 
        return os.chmod(self.translate(p), mode)
46
38
 
47
39
 
48
40
class test_authorizer(object):
72
64
            and password != self.secured_password):
73
65
            return 0, 'Password invalid.', None
74
66
        else:
75
 
            return 1, 'OK.', test_filesystem(self.root)
 
67
            return 1, 'OK.', medusa.filesys.os_filesystem(self.root)
76
68
 
77
69
 
78
70
class ftp_channel(medusa.ftp_server.ftp_channel):
126
118
    def cmd_size(self, line):
127
119
        """Return the size of a file
128
120
 
129
 
        This is overloaded to help the test suite determine if the
 
121
        This is overloaded to help the test suite determine if the 
130
122
        target is a directory.
131
123
        """
132
124
        filename = line[1]
136
128
            else:
137
129
                self.respond('550 "%s" is not a file' % (filename,))
138
130
        else:
139
 
            self.respond('213 %d'
 
131
            self.respond('213 %d' 
140
132
                % (self.filesystem.stat(filename)[stat.ST_SIZE]),)
141
133
 
142
134
    def cmd_mkd(self, line):
161
153
            except:
162
154
                self.respond ('550 error creating directory.')
163
155
 
164
 
    def cmd_site(self, line):
165
 
        """Site specific commands."""
166
 
        command, args = line[1].split(' ', 1)
167
 
        if command.lower() == 'chmod':
168
 
            try:
169
 
                mode, path = args.split()
170
 
                mode = int(mode, 8)
171
 
            except ValueError:
172
 
                # We catch both malformed line and malformed mode with the same
173
 
                # ValueError.
174
 
                self.command_not_understood(' '.join(line))
175
 
                return
176
 
            try:
177
 
                # Yes path and mode are reversed
178
 
                self.filesystem.chmod(path, mode)
179
 
                self.respond('200 SITE CHMOD command successful')
180
 
            except AttributeError:
181
 
                # The chmod method is not available in read-only and will raise
182
 
                # AttributeError since a different filesystem is used in that
183
 
                # case
184
 
                self.command_not_authorized(' '.join(line))
185
 
        else:
186
 
            # Another site specific command was requested. We don't know that
187
 
            # one
188
 
            self.command_not_understood(' '.join(line))
189
 
 
190
156
 
191
157
class ftp_server(medusa.ftp_server.ftp_server):
192
158
    """Customize the behavior of the Medusa ftp_server.
210
176
        trace.mutter('ftp_server %s: %s', type, message)
211
177
 
212
178
 
213
 
class FTPTestServer(test_server.TestServer):
 
179
class FTPServer(transport.Server):
214
180
    """Common code for FTP server facilities."""
215
181
 
216
 
    no_unicode_support = True
217
 
 
218
182
    def __init__(self):
219
183
        self._root = None
220
184
        self._ftp_server = None
235
199
        """This is used by medusa.ftp_server to log connections, etc."""
236
200
        self.logs.append(message)
237
201
 
238
 
    def start_server(self, vfs_server=None):
239
 
        if not (vfs_server is None or isinstance(vfs_server,
240
 
                                                 test_server.LocalURLServer)):
241
 
            raise AssertionError(
242
 
                "FTPServer currently assumes local transport, got %s" % vfs_server)
 
202
    def setUp(self, vfs_server=None):
 
203
        from bzrlib.transport.local import LocalURLServer
 
204
        assert vfs_server is None or isinstance(vfs_server, LocalURLServer), \
 
205
            "FTPServer currently assumes local transport, got %s" % vfs_server
 
206
 
243
207
        self._root = os.getcwdu()
244
208
        self._ftp_server = ftp_server(
245
209
            authorizer=test_authorizer(root=self._root),
252
216
        # Don't let it loop forever, or handle an infinite number of requests.
253
217
        # In this case it will run for 1000s, or 10000 requests
254
218
        self._async_thread = threading.Thread(
255
 
                target=FTPTestServer._asyncore_loop_ignore_EBADF,
 
219
                target=FTPServer._asyncore_loop_ignore_EBADF,
256
220
                kwargs={'timeout':0.1, 'count':10000})
257
 
        if 'threads' in tests.selftest_debug_flags:
258
 
            sys.stderr.write('Thread started: %s\n'
259
 
                             % (self._async_thread.ident,))
260
221
        self._async_thread.setDaemon(True)
261
222
        self._async_thread.start()
262
223
 
263
 
    def stop_server(self):
 
224
    def tearDown(self):
 
225
        """See bzrlib.transport.Server.tearDown."""
264
226
        self._ftp_server.close()
265
227
        asyncore.close_all()
266
228
        self._async_thread.join()
267
 
        if 'threads' in tests.selftest_debug_flags:
268
 
            sys.stderr.write('Thread  joined: %s\n'
269
 
                             % (self._async_thread.ident,))
270
229
 
271
230
    @staticmethod
272
231
    def _asyncore_loop_ignore_EBADF(*args, **kwargs):
286
245
            if e.args[0] != errno.EBADF:
287
246
                raise
288
247
 
289
 
    def add_user(self, user, password):
290
 
        """Add a user with write access."""
291
 
        authorizer = server = self._ftp_server.authorizer
292
 
        authorizer.secured_user = user
293
 
        authorizer.secured_password = password
 
248
 
 
249
 
294
250