~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/ftp_server/pyftpdlib_based.py

  • Committer: Vincent Ladeuil
  • Date: 2017-01-17 13:48:10 UTC
  • mfrom: (6615.3.6 merges)
  • mto: This revision was merged to the branch mainline in revision 6620.
  • Revision ID: v.ladeuil+lp@free.fr-20170117134810-j9p3lidfy6pfyfsc
Merge 2.7, resolving conflicts

Show diffs side-by-side

added added

removed removed

Lines of Context:
34
34
from bzrlib.tests import test_server
35
35
 
36
36
 
 
37
# Convert the pyftplib string version into a tuple to avoid traps in string
 
38
# comparison.
 
39
pyftplib_version = tuple(map(int, ftpserver.__ver__.split('.')))
 
40
 
 
41
 
37
42
class AnonymousWithWriteAccessAuthorizer(ftpserver.DummyAuthorizer):
38
43
 
39
44
    def _check_permissions(self, username, perm):
91
96
        else:
92
97
            ftpserver.FTPHandler.ftp_NLST(self, path)
93
98
 
94
 
    def ftp_SITE_CHMOD(self, line):
95
 
        try:
96
 
            mode, path = line.split(None, 1)
97
 
            mode = int(mode, 8)
98
 
        except ValueError:
99
 
            # We catch both malformed line and malformed mode with the same
100
 
            # ValueError.
101
 
            self.respond("500 'SITE CHMOD %s': command not understood."
102
 
                         % line)
103
 
            self.log('FAIL SITE CHMOD ' % line)
104
 
            return
105
 
        ftp_path = self.fs.fs2ftp(path)
106
 
        try:
107
 
            self.run_as_current_user(self.fs.chmod, self.fs.ftp2fs(path), mode)
108
 
        except OSError, err:
109
 
            why = ftpserver._strerror(err)
110
 
            self.log('FAIL SITE CHMOD 0%03o "%s". %s.' % (mode, ftp_path, why))
111
 
            self.respond('550 %s.' % why)
112
 
        else:
113
 
            self.log('OK SITE CHMOD 0%03o "%s".' % (mode, ftp_path))
114
 
            self.respond('200 SITE CHMOD succesful.')
115
 
 
116
 
 
117
 
# pyftpdlib says to define SITE commands by declaring ftp_SITE_<CMD> methods,
118
 
# but fails to recognize them.
119
 
ftpserver.proto_cmds['SITE CHMOD'] = ftpserver._CommandProperty(
120
 
    perm='w', # Best fit choice even if not exactly right (can be d, f or m too)
121
 
    auth_needed=True, arg_needed=True, check_path=False,
122
 
    help='Syntax: SITE CHMOD <SP>  octal_mode_bits file-name (chmod file)',
123
 
    )
124
 
# An empty password is valid, hence the arg is neither mandatory not forbidden
125
 
ftpserver.proto_cmds['PASS'].arg_needed = None
126
 
 
 
99
    def log_cmd(self, cmd, arg, respcode, respstr):
 
100
        # base class version choke on unicode, the alternative is to just
 
101
        # provide an empty implementation and relies on the client to do
 
102
        # the logging for debugging purposes. Not worth the trouble so far
 
103
        # -- vila 20110607
 
104
        if cmd in ("DELE", "RMD", "RNFR", "RNTO", "MKD"):
 
105
            line = '"%s" %s' % (' '.join([cmd, unicode(arg)]).strip(), respcode)
 
106
            self.log(line)
 
107
 
 
108
 
 
109
# An empty password is valid, hence the arg is neither mandatory nor forbidden
 
110
ftpserver.proto_cmds['PASS']['arg'] = None
127
111
 
128
112
class ftp_server(ftpserver.FTPServer):
129
113
 
168
152
 
169
153
        address = ('localhost', 0) # bind to a random port
170
154
        authorizer = AnonymousWithWriteAccessAuthorizer()
171
 
        authorizer.add_anonymous(self._root, perm='elradfmw')
 
155
        authorizer.add_anonymous(self._root, perm='elradfmwM')
172
156
        self._ftp_server = ftp_server(address, BzrConformingFTPHandler,
173
157
                                      authorizer)
174
158
        # This is hacky as hell, will not work if we need two servers working
204
188
                             % (self._ftpd_thread.ident,))
205
189
 
206
190
    def _run_server(self):
207
 
        """Run the server until stop_server is called, shut it down properly then.
 
191
        """Run the server until stop_server is called.
 
192
 
 
193
        Shut it down properly then.
208
194
        """
209
195
        self._ftpd_running = True
210
196
        self._ftpd_starting.release()
219
205
    def add_user(self, user, password):
220
206
        """Add a user with write access."""
221
207
        self._ftp_server.authorizer.add_user(user, password, self._root,
222
 
                                             perm='elradfmw')
 
208
                                             perm='elradfmwM')