~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/stub_sftp.py

  • Committer: mbp at sourcefrog
  • Date: 2005-03-25 01:16:46 UTC
  • Revision ID: mbp@sourcefrog.net-20050325011646-e3f0af5d6bd1190c
- update version string
- put it in bzrlib

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2008-2011 Robey Pointer <robey@lag.net>, Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""
18
 
A stub SFTP server for loopback SFTP testing.
19
 
Adapted from the one in paramiko's unit tests.
20
 
"""
21
 
 
22
 
import os
23
 
import paramiko
24
 
import socket
25
 
import SocketServer
26
 
import sys
27
 
import time
28
 
 
29
 
from bzrlib import (
30
 
    osutils,
31
 
    trace,
32
 
    urlutils,
33
 
    )
34
 
from bzrlib.transport import (
35
 
    ssh,
36
 
    )
37
 
from bzrlib.tests import test_server
38
 
 
39
 
 
40
 
class StubServer(paramiko.ServerInterface):
41
 
 
42
 
    def __init__(self, test_case_server):
43
 
        paramiko.ServerInterface.__init__(self)
44
 
        self.log = test_case_server.log
45
 
 
46
 
    def check_auth_password(self, username, password):
47
 
        # all are allowed
48
 
        self.log('sftpserver - authorizing: %s' % (username,))
49
 
        return paramiko.AUTH_SUCCESSFUL
50
 
 
51
 
    def check_channel_request(self, kind, chanid):
52
 
        self.log('sftpserver - channel request: %s, %s' % (kind, chanid))
53
 
        return paramiko.OPEN_SUCCEEDED
54
 
 
55
 
 
56
 
class StubSFTPHandle(paramiko.SFTPHandle):
57
 
 
58
 
    def stat(self):
59
 
        try:
60
 
            return paramiko.SFTPAttributes.from_stat(
61
 
                os.fstat(self.readfile.fileno()))
62
 
        except OSError, e:
63
 
            return paramiko.SFTPServer.convert_errno(e.errno)
64
 
 
65
 
    def chattr(self, attr):
66
 
        # python doesn't have equivalents to fchown or fchmod, so we have to
67
 
        # use the stored filename
68
 
        trace.mutter('Changing permissions on %s to %s', self.filename, attr)
69
 
        try:
70
 
            paramiko.SFTPServer.set_file_attr(self.filename, attr)
71
 
        except OSError, e:
72
 
            return paramiko.SFTPServer.convert_errno(e.errno)
73
 
 
74
 
 
75
 
class StubSFTPServer(paramiko.SFTPServerInterface):
76
 
 
77
 
    def __init__(self, server, root, home=None):
78
 
        paramiko.SFTPServerInterface.__init__(self, server)
79
 
        # All paths are actually relative to 'root'.
80
 
        # this is like implementing chroot().
81
 
        self.root = root
82
 
        if home is None:
83
 
            self.home = ''
84
 
        else:
85
 
            if not home.startswith(self.root):
86
 
                raise AssertionError(
87
 
                    "home must be a subdirectory of root (%s vs %s)"
88
 
                    % (home, root))
89
 
            self.home = home[len(self.root):]
90
 
        if self.home.startswith('/'):
91
 
            self.home = self.home[1:]
92
 
        server.log('sftpserver - new connection')
93
 
 
94
 
    def _realpath(self, path):
95
 
        # paths returned from self.canonicalize() always start with
96
 
        # a path separator. So if 'root' is just '/', this would cause
97
 
        # a double slash at the beginning '//home/dir'.
98
 
        if self.root == '/':
99
 
            return self.canonicalize(path)
100
 
        return self.root + self.canonicalize(path)
101
 
 
102
 
    if sys.platform == 'win32':
103
 
        def canonicalize(self, path):
104
 
            # Win32 sftp paths end up looking like
105
 
            #     sftp://host@foo/h:/foo/bar
106
 
            # which means absolute paths look like:
107
 
            #     /h:/foo/bar
108
 
            # and relative paths stay the same:
109
 
            #     foo/bar
110
 
            # win32 needs to use the Unicode APIs. so we require the
111
 
            # paths to be utf8 (Linux just uses bytestreams)
112
 
            thispath = path.decode('utf8')
113
 
            if path.startswith('/'):
114
 
                # Abspath H:/foo/bar
115
 
                return os.path.normpath(thispath[1:])
116
 
            else:
117
 
                return os.path.normpath(os.path.join(self.home, thispath))
118
 
    else:
119
 
        def canonicalize(self, path):
120
 
            if os.path.isabs(path):
121
 
                return os.path.normpath(path)
122
 
            else:
123
 
                return os.path.normpath('/' + os.path.join(self.home, path))
124
 
 
125
 
    def chattr(self, path, attr):
126
 
        try:
127
 
            paramiko.SFTPServer.set_file_attr(path, attr)
128
 
        except OSError, e:
129
 
            return paramiko.SFTPServer.convert_errno(e.errno)
130
 
        return paramiko.SFTP_OK
131
 
 
132
 
    def list_folder(self, path):
133
 
        path = self._realpath(path)
134
 
        try:
135
 
            out = [ ]
136
 
            # TODO: win32 incorrectly lists paths with non-ascii if path is not
137
 
            # unicode. However on unix the server should only deal with
138
 
            # bytestreams and posix.listdir does the right thing
139
 
            if sys.platform == 'win32':
140
 
                flist = [f.encode('utf8') for f in os.listdir(path)]
141
 
            else:
142
 
                flist = os.listdir(path)
143
 
            for fname in flist:
144
 
                attr = paramiko.SFTPAttributes.from_stat(
145
 
                    os.stat(osutils.pathjoin(path, fname)))
146
 
                attr.filename = fname
147
 
                out.append(attr)
148
 
            return out
149
 
        except OSError, e:
150
 
            return paramiko.SFTPServer.convert_errno(e.errno)
151
 
 
152
 
    def stat(self, path):
153
 
        path = self._realpath(path)
154
 
        try:
155
 
            return paramiko.SFTPAttributes.from_stat(os.stat(path))
156
 
        except OSError, e:
157
 
            return paramiko.SFTPServer.convert_errno(e.errno)
158
 
 
159
 
    def lstat(self, path):
160
 
        path = self._realpath(path)
161
 
        try:
162
 
            return paramiko.SFTPAttributes.from_stat(os.lstat(path))
163
 
        except OSError, e:
164
 
            return paramiko.SFTPServer.convert_errno(e.errno)
165
 
 
166
 
    def open(self, path, flags, attr):
167
 
        path = self._realpath(path)
168
 
        try:
169
 
            flags |= getattr(os, 'O_BINARY', 0)
170
 
            if getattr(attr, 'st_mode', None):
171
 
                fd = os.open(path, flags, attr.st_mode)
172
 
            else:
173
 
                # os.open() defaults to 0777 which is
174
 
                # an odd default mode for files
175
 
                fd = os.open(path, flags, 0666)
176
 
        except OSError, e:
177
 
            return paramiko.SFTPServer.convert_errno(e.errno)
178
 
 
179
 
        if (flags & os.O_CREAT) and (attr is not None):
180
 
            attr._flags &= ~attr.FLAG_PERMISSIONS
181
 
            paramiko.SFTPServer.set_file_attr(path, attr)
182
 
        if flags & os.O_WRONLY:
183
 
            fstr = 'wb'
184
 
        elif flags & os.O_RDWR:
185
 
            fstr = 'rb+'
186
 
        else:
187
 
            # O_RDONLY (== 0)
188
 
            fstr = 'rb'
189
 
        try:
190
 
            f = os.fdopen(fd, fstr)
191
 
        except (IOError, OSError), e:
192
 
            return paramiko.SFTPServer.convert_errno(e.errno)
193
 
        fobj = StubSFTPHandle()
194
 
        fobj.filename = path
195
 
        fobj.readfile = f
196
 
        fobj.writefile = f
197
 
        return fobj
198
 
 
199
 
    def remove(self, path):
200
 
        path = self._realpath(path)
201
 
        try:
202
 
            os.remove(path)
203
 
        except OSError, e:
204
 
            return paramiko.SFTPServer.convert_errno(e.errno)
205
 
        return paramiko.SFTP_OK
206
 
 
207
 
    def rename(self, oldpath, newpath):
208
 
        oldpath = self._realpath(oldpath)
209
 
        newpath = self._realpath(newpath)
210
 
        try:
211
 
            os.rename(oldpath, newpath)
212
 
        except OSError, e:
213
 
            return paramiko.SFTPServer.convert_errno(e.errno)
214
 
        return paramiko.SFTP_OK
215
 
 
216
 
    def mkdir(self, path, attr):
217
 
        path = self._realpath(path)
218
 
        try:
219
 
            # Using getattr() in case st_mode is None or 0
220
 
            # both evaluate to False
221
 
            if getattr(attr, 'st_mode', None):
222
 
                os.mkdir(path, attr.st_mode)
223
 
            else:
224
 
                os.mkdir(path)
225
 
            if attr is not None:
226
 
                attr._flags &= ~attr.FLAG_PERMISSIONS
227
 
                paramiko.SFTPServer.set_file_attr(path, attr)
228
 
        except OSError, e:
229
 
            return paramiko.SFTPServer.convert_errno(e.errno)
230
 
        return paramiko.SFTP_OK
231
 
 
232
 
    def rmdir(self, path):
233
 
        path = self._realpath(path)
234
 
        try:
235
 
            os.rmdir(path)
236
 
        except OSError, e:
237
 
            return paramiko.SFTPServer.convert_errno(e.errno)
238
 
        return paramiko.SFTP_OK
239
 
 
240
 
    # removed: chattr, symlink, readlink
241
 
    # (nothing in bzr's sftp transport uses those)
242
 
 
243
 
 
244
 
# ------------- server test implementation --------------
245
 
 
246
 
STUB_SERVER_KEY = """
247
 
-----BEGIN RSA PRIVATE KEY-----
248
 
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
249
 
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
250
 
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
251
 
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
252
 
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
253
 
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
254
 
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
255
 
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
256
 
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
257
 
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
258
 
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
259
 
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
260
 
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
261
 
-----END RSA PRIVATE KEY-----
262
 
"""
263
 
 
264
 
 
265
 
class SocketDelay(object):
266
 
    """A socket decorator to make TCP appear slower.
267
 
 
268
 
    This changes recv, send, and sendall to add a fixed latency to each python
269
 
    call if a new roundtrip is detected. That is, when a recv is called and the
270
 
    flag new_roundtrip is set, latency is charged. Every send and send_all
271
 
    sets this flag.
272
 
 
273
 
    In addition every send, sendall and recv sleeps a bit per character send to
274
 
    simulate bandwidth.
275
 
 
276
 
    Not all methods are implemented, this is deliberate as this class is not a
277
 
    replacement for the builtin sockets layer. fileno is not implemented to
278
 
    prevent the proxy being bypassed.
279
 
    """
280
 
 
281
 
    simulated_time = 0
282
 
    _proxied_arguments = dict.fromkeys([
283
 
        "close", "getpeername", "getsockname", "getsockopt", "gettimeout",
284
 
        "setblocking", "setsockopt", "settimeout", "shutdown"])
285
 
 
286
 
    def __init__(self, sock, latency, bandwidth=1.0,
287
 
                 really_sleep=True):
288
 
        """
289
 
        :param bandwith: simulated bandwith (MegaBit)
290
 
        :param really_sleep: If set to false, the SocketDelay will just
291
 
        increase a counter, instead of calling time.sleep. This is useful for
292
 
        unittesting the SocketDelay.
293
 
        """
294
 
        self.sock = sock
295
 
        self.latency = latency
296
 
        self.really_sleep = really_sleep
297
 
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024)
298
 
        self.new_roundtrip = False
299
 
 
300
 
    def sleep(self, s):
301
 
        if self.really_sleep:
302
 
            time.sleep(s)
303
 
        else:
304
 
            SocketDelay.simulated_time += s
305
 
 
306
 
    def __getattr__(self, attr):
307
 
        if attr in SocketDelay._proxied_arguments:
308
 
            return getattr(self.sock, attr)
309
 
        raise AttributeError("'SocketDelay' object has no attribute %r" %
310
 
                             attr)
311
 
 
312
 
    def dup(self):
313
 
        return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
314
 
                           self._sleep)
315
 
 
316
 
    def recv(self, *args):
317
 
        data = self.sock.recv(*args)
318
 
        if data and self.new_roundtrip:
319
 
            self.new_roundtrip = False
320
 
            self.sleep(self.latency)
321
 
        self.sleep(len(data) * self.time_per_byte)
322
 
        return data
323
 
 
324
 
    def sendall(self, data, flags=0):
325
 
        if not self.new_roundtrip:
326
 
            self.new_roundtrip = True
327
 
            self.sleep(self.latency)
328
 
        self.sleep(len(data) * self.time_per_byte)
329
 
        return self.sock.sendall(data, flags)
330
 
 
331
 
    def send(self, data, flags=0):
332
 
        if not self.new_roundtrip:
333
 
            self.new_roundtrip = True
334
 
            self.sleep(self.latency)
335
 
        bytes_sent = self.sock.send(data, flags)
336
 
        self.sleep(bytes_sent * self.time_per_byte)
337
 
        return bytes_sent
338
 
 
339
 
 
340
 
class TestingSFTPConnectionHandler(SocketServer.BaseRequestHandler):
341
 
 
342
 
    def setup(self):
343
 
        self.wrap_for_latency()
344
 
        tcs = self.server.test_case_server
345
 
        ptrans = paramiko.Transport(self.request)
346
 
        self.paramiko_transport = ptrans
347
 
        # Set it to a channel under 'bzr' so that we get debug info
348
 
        ptrans.set_log_channel('bzr.paramiko.transport')
349
 
        ptrans.add_server_key(tcs.get_host_key())
350
 
        ptrans.set_subsystem_handler('sftp', paramiko.SFTPServer,
351
 
                                     StubSFTPServer, root=tcs._root,
352
 
                                     home=tcs._server_homedir)
353
 
        server = tcs._server_interface(tcs)
354
 
        # This blocks until the key exchange has been done
355
 
        ptrans.start_server(None, server)
356
 
 
357
 
    def finish(self):
358
 
        # Wait for the conversation to finish, when the paramiko.Transport
359
 
        # thread finishes
360
 
        # TODO: Consider timing out after XX seconds rather than hanging.
361
 
        #       Also we could check paramiko_transport.active and possibly
362
 
        #       paramiko_transport.getException().
363
 
        self.paramiko_transport.join()
364
 
 
365
 
    def wrap_for_latency(self):
366
 
        tcs = self.server.test_case_server
367
 
        if tcs.add_latency:
368
 
            # Give the socket (which the request really is) a latency adding
369
 
            # decorator.
370
 
            self.request = SocketDelay(self.request, tcs.add_latency)
371
 
 
372
 
 
373
 
class TestingSFTPWithoutSSHConnectionHandler(TestingSFTPConnectionHandler):
374
 
 
375
 
    def setup(self):
376
 
        self.wrap_for_latency()
377
 
        # Re-import these as locals, so that they're still accessible during
378
 
        # interpreter shutdown (when all module globals get set to None, leading
379
 
        # to confusing errors like "'NoneType' object has no attribute 'error'".
380
 
        class FakeChannel(object):
381
 
            def get_transport(self):
382
 
                return self
383
 
            def get_log_channel(self):
384
 
                return 'bzr.paramiko'
385
 
            def get_name(self):
386
 
                return '1'
387
 
            def get_hexdump(self):
388
 
                return False
389
 
            def close(self):
390
 
                pass
391
 
 
392
 
        tcs = self.server.test_case_server
393
 
        sftp_server = paramiko.SFTPServer(
394
 
            FakeChannel(), 'sftp', StubServer(tcs), StubSFTPServer,
395
 
            root=tcs._root, home=tcs._server_homedir)
396
 
        self.sftp_server = sftp_server
397
 
        sys_stderr = sys.stderr # Used in error reporting during shutdown
398
 
        try:
399
 
            sftp_server.start_subsystem(
400
 
                'sftp', None, ssh.SocketAsChannelAdapter(self.request))
401
 
        except socket.error, e:
402
 
            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
403
 
                # it's okay for the client to disconnect abruptly
404
 
                # (bug in paramiko 1.6: it should absorb this exception)
405
 
                pass
406
 
            else:
407
 
                raise
408
 
        except Exception, e:
409
 
            # This typically seems to happen during interpreter shutdown, so
410
 
            # most of the useful ways to report this error won't work.
411
 
            # Writing the exception type, and then the text of the exception,
412
 
            # seems to be the best we can do.
413
 
            # FIXME: All interpreter shutdown errors should have been related
414
 
            # to daemon threads, cleanup needed -- vila 20100623
415
 
            sys_stderr.write('\nEXCEPTION %r: ' % (e.__class__,))
416
 
            sys_stderr.write('%s\n\n' % (e,))
417
 
 
418
 
    def finish(self):
419
 
        self.sftp_server.finish_subsystem()
420
 
 
421
 
 
422
 
class TestingSFTPServer(test_server.TestingThreadingTCPServer):
423
 
 
424
 
    def __init__(self, server_address, request_handler_class, test_case_server):
425
 
        test_server.TestingThreadingTCPServer.__init__(
426
 
            self, server_address, request_handler_class)
427
 
        self.test_case_server = test_case_server
428
 
 
429
 
 
430
 
class SFTPServer(test_server.TestingTCPServerInAThread):
431
 
    """Common code for SFTP server facilities."""
432
 
 
433
 
    def __init__(self, server_interface=StubServer):
434
 
        self.host = '127.0.0.1'
435
 
        self.port = 0
436
 
        super(SFTPServer, self).__init__((self.host, self.port),
437
 
                                         TestingSFTPServer,
438
 
                                         TestingSFTPConnectionHandler)
439
 
        self._original_vendor = None
440
 
        self._vendor = ssh.ParamikoVendor()
441
 
        self._server_interface = server_interface
442
 
        self._host_key = None
443
 
        self.logs = []
444
 
        self.add_latency = 0
445
 
        self._homedir = None
446
 
        self._server_homedir = None
447
 
        self._root = None
448
 
 
449
 
    def _get_sftp_url(self, path):
450
 
        """Calculate an sftp url to this server for path."""
451
 
        return "sftp://foo:bar@%s:%s/%s" % (self.host, self.port, path)
452
 
 
453
 
    def log(self, message):
454
 
        """StubServer uses this to log when a new server is created."""
455
 
        self.logs.append(message)
456
 
 
457
 
    def create_server(self):
458
 
        server = self.server_class((self.host, self.port),
459
 
                                   self.request_handler_class,
460
 
                                   self)
461
 
        return server
462
 
 
463
 
    def get_host_key(self):
464
 
        if self._host_key is None:
465
 
            key_file = osutils.pathjoin(self._homedir, 'test_rsa.key')
466
 
            f = open(key_file, 'w')
467
 
            try:
468
 
                f.write(STUB_SERVER_KEY)
469
 
            finally:
470
 
                f.close()
471
 
            self._host_key = paramiko.RSAKey.from_private_key_file(key_file)
472
 
        return self._host_key
473
 
 
474
 
    def start_server(self, backing_server=None):
475
 
        # XXX: TODO: make sftpserver back onto backing_server rather than local
476
 
        # disk.
477
 
        if not (backing_server is None or
478
 
                isinstance(backing_server, test_server.LocalURLServer)):
479
 
            raise AssertionError(
480
 
                'backing_server should not be %r, because this can only serve '
481
 
                'the local current working directory.' % (backing_server,))
482
 
        self._original_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
483
 
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._vendor
484
 
        if sys.platform == 'win32':
485
 
            # Win32 needs to use the UNICODE api
486
 
            self._homedir = os.getcwdu()
487
 
            # Normalize the path or it will be wrongly escaped
488
 
            self._homedir = osutils.normpath(self._homedir)
489
 
        else:
490
 
            # But unix SFTP servers should just deal in bytestreams
491
 
            self._homedir = os.getcwd()
492
 
        if self._server_homedir is None:
493
 
            self._server_homedir = self._homedir
494
 
        self._root = '/'
495
 
        if sys.platform == 'win32':
496
 
            self._root = ''
497
 
        super(SFTPServer, self).start_server()
498
 
 
499
 
    def stop_server(self):
500
 
        try:
501
 
            super(SFTPServer, self).stop_server()
502
 
        finally:
503
 
            ssh._ssh_vendor_manager._cached_ssh_vendor = self._original_vendor
504
 
 
505
 
    def get_bogus_url(self):
506
 
        """See bzrlib.transport.Server.get_bogus_url."""
507
 
        # this is chosen to try to prevent trouble with proxies, weird dns, etc
508
 
        # we bind a random socket, so that we get a guaranteed unused port
509
 
        # we just never listen on that port
510
 
        s = socket.socket()
511
 
        s.bind(('localhost', 0))
512
 
        return 'sftp://%s:%s/' % s.getsockname()
513
 
 
514
 
 
515
 
class SFTPFullAbsoluteServer(SFTPServer):
516
 
    """A test server for sftp transports, using absolute urls and ssh."""
517
 
 
518
 
    def get_url(self):
519
 
        """See bzrlib.transport.Server.get_url."""
520
 
        homedir = self._homedir
521
 
        if sys.platform != 'win32':
522
 
            # Remove the initial '/' on all platforms but win32
523
 
            homedir = homedir[1:]
524
 
        return self._get_sftp_url(urlutils.escape(homedir))
525
 
 
526
 
 
527
 
class SFTPServerWithoutSSH(SFTPServer):
528
 
    """An SFTP server that uses a simple TCP socket pair rather than SSH."""
529
 
 
530
 
    def __init__(self):
531
 
        super(SFTPServerWithoutSSH, self).__init__()
532
 
        self._vendor = ssh.LoopbackVendor()
533
 
        self.request_handler_class = TestingSFTPWithoutSSHConnectionHandler
534
 
 
535
 
    def get_host_key():
536
 
        return None
537
 
 
538
 
 
539
 
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
540
 
    """A test server for sftp transports, using absolute urls."""
541
 
 
542
 
    def get_url(self):
543
 
        """See bzrlib.transport.Server.get_url."""
544
 
        homedir = self._homedir
545
 
        if sys.platform != 'win32':
546
 
            # Remove the initial '/' on all platforms but win32
547
 
            homedir = homedir[1:]
548
 
        return self._get_sftp_url(urlutils.escape(homedir))
549
 
 
550
 
 
551
 
class SFTPHomeDirServer(SFTPServerWithoutSSH):
552
 
    """A test server for sftp transports, using homedir relative urls."""
553
 
 
554
 
    def get_url(self):
555
 
        """See bzrlib.transport.Server.get_url."""
556
 
        return self._get_sftp_url("~/")
557
 
 
558
 
 
559
 
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
560
 
    """A test server for sftp transports where only absolute paths will work.
561
 
 
562
 
    It does this by serving from a deeply-nested directory that doesn't exist.
563
 
    """
564
 
 
565
 
    def create_server(self):
566
 
        # FIXME: Can't we do that in a cleaner way ? -- vila 20100623
567
 
        server = super(SFTPSiblingAbsoluteServer, self).create_server()
568
 
        server._server_homedir = '/dev/noone/runs/tests/here'
569
 
        return server
570