~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/stub_sftp.py

  • Committer: Martin Pool
  • Date: 2005-06-24 09:03:20 UTC
  • Revision ID: mbp@sourcefrog.net-20050624090320-c34ea3e5aa81d01d
- write new working inventory using AtomicFile

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2008, 2009, 2010 Robey Pointer <robey@lag.net>, Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""
18
 
A stub SFTP server for loopback SFTP testing.
19
 
Adapted from the one in paramiko's unit tests.
20
 
"""
21
 
 
22
 
import os
23
 
import paramiko
24
 
import select
25
 
import socket
26
 
import sys
27
 
import threading
28
 
import time
29
 
 
30
 
from bzrlib import (
31
 
    osutils,
32
 
    trace,
33
 
    urlutils,
34
 
    )
35
 
from bzrlib.transport import (
36
 
    ssh,
37
 
    )
38
 
from bzrlib.tests import test_server
39
 
 
40
 
 
41
 
class StubServer (paramiko.ServerInterface):
42
 
 
43
 
    def __init__(self, test_case):
44
 
        paramiko.ServerInterface.__init__(self)
45
 
        self._test_case = test_case
46
 
 
47
 
    def check_auth_password(self, username, password):
48
 
        # all are allowed
49
 
        self._test_case.log('sftpserver - authorizing: %s' % (username,))
50
 
        return paramiko.AUTH_SUCCESSFUL
51
 
 
52
 
    def check_channel_request(self, kind, chanid):
53
 
        self._test_case.log(
54
 
            'sftpserver - channel request: %s, %s' % (kind, chanid))
55
 
        return paramiko.OPEN_SUCCEEDED
56
 
 
57
 
 
58
 
class StubSFTPHandle (paramiko.SFTPHandle):
59
 
    def stat(self):
60
 
        try:
61
 
            return paramiko.SFTPAttributes.from_stat(
62
 
                os.fstat(self.readfile.fileno()))
63
 
        except OSError, e:
64
 
            return paramiko.SFTPServer.convert_errno(e.errno)
65
 
 
66
 
    def chattr(self, attr):
67
 
        # python doesn't have equivalents to fchown or fchmod, so we have to
68
 
        # use the stored filename
69
 
        trace.mutter('Changing permissions on %s to %s', self.filename, attr)
70
 
        try:
71
 
            paramiko.SFTPServer.set_file_attr(self.filename, attr)
72
 
        except OSError, e:
73
 
            return paramiko.SFTPServer.convert_errno(e.errno)
74
 
 
75
 
 
76
 
class StubSFTPServer (paramiko.SFTPServerInterface):
77
 
 
78
 
    def __init__(self, server, root, home=None):
79
 
        paramiko.SFTPServerInterface.__init__(self, server)
80
 
        # All paths are actually relative to 'root'.
81
 
        # this is like implementing chroot().
82
 
        self.root = root
83
 
        if home is None:
84
 
            self.home = ''
85
 
        else:
86
 
            if not home.startswith(self.root):
87
 
                raise AssertionError(
88
 
                    "home must be a subdirectory of root (%s vs %s)"
89
 
                    % (home, root))
90
 
            self.home = home[len(self.root):]
91
 
        if self.home.startswith('/'):
92
 
            self.home = self.home[1:]
93
 
        server._test_case.log('sftpserver - new connection')
94
 
 
95
 
    def _realpath(self, path):
96
 
        # paths returned from self.canonicalize() always start with
97
 
        # a path separator. So if 'root' is just '/', this would cause
98
 
        # a double slash at the beginning '//home/dir'.
99
 
        if self.root == '/':
100
 
            return self.canonicalize(path)
101
 
        return self.root + self.canonicalize(path)
102
 
 
103
 
    if sys.platform == 'win32':
104
 
        def canonicalize(self, path):
105
 
            # Win32 sftp paths end up looking like
106
 
            #     sftp://host@foo/h:/foo/bar
107
 
            # which means absolute paths look like:
108
 
            #     /h:/foo/bar
109
 
            # and relative paths stay the same:
110
 
            #     foo/bar
111
 
            # win32 needs to use the Unicode APIs. so we require the
112
 
            # paths to be utf8 (Linux just uses bytestreams)
113
 
            thispath = path.decode('utf8')
114
 
            if path.startswith('/'):
115
 
                # Abspath H:/foo/bar
116
 
                return os.path.normpath(thispath[1:])
117
 
            else:
118
 
                return os.path.normpath(os.path.join(self.home, thispath))
119
 
    else:
120
 
        def canonicalize(self, path):
121
 
            if os.path.isabs(path):
122
 
                return os.path.normpath(path)
123
 
            else:
124
 
                return os.path.normpath('/' + os.path.join(self.home, path))
125
 
 
126
 
    def chattr(self, path, attr):
127
 
        try:
128
 
            paramiko.SFTPServer.set_file_attr(path, attr)
129
 
        except OSError, e:
130
 
            return paramiko.SFTPServer.convert_errno(e.errno)
131
 
        return paramiko.SFTP_OK
132
 
 
133
 
    def list_folder(self, path):
134
 
        path = self._realpath(path)
135
 
        try:
136
 
            out = [ ]
137
 
            # TODO: win32 incorrectly lists paths with non-ascii if path is not
138
 
            # unicode. However on Linux the server should only deal with
139
 
            # bytestreams and posix.listdir does the right thing
140
 
            if sys.platform == 'win32':
141
 
                flist = [f.encode('utf8') for f in os.listdir(path)]
142
 
            else:
143
 
                flist = os.listdir(path)
144
 
            for fname in flist:
145
 
                attr = paramiko.SFTPAttributes.from_stat(
146
 
                    os.stat(osutils.pathjoin(path, fname)))
147
 
                attr.filename = fname
148
 
                out.append(attr)
149
 
            return out
150
 
        except OSError, e:
151
 
            return paramiko.SFTPServer.convert_errno(e.errno)
152
 
 
153
 
    def stat(self, path):
154
 
        path = self._realpath(path)
155
 
        try:
156
 
            return paramiko.SFTPAttributes.from_stat(os.stat(path))
157
 
        except OSError, e:
158
 
            return paramiko.SFTPServer.convert_errno(e.errno)
159
 
 
160
 
    def lstat(self, path):
161
 
        path = self._realpath(path)
162
 
        try:
163
 
            return paramiko.SFTPAttributes.from_stat(os.lstat(path))
164
 
        except OSError, e:
165
 
            return paramiko.SFTPServer.convert_errno(e.errno)
166
 
 
167
 
    def open(self, path, flags, attr):
168
 
        path = self._realpath(path)
169
 
        try:
170
 
            flags |= getattr(os, 'O_BINARY', 0)
171
 
            if getattr(attr, 'st_mode', None):
172
 
                fd = os.open(path, flags, attr.st_mode)
173
 
            else:
174
 
                # os.open() defaults to 0777 which is
175
 
                # an odd default mode for files
176
 
                fd = os.open(path, flags, 0666)
177
 
        except OSError, e:
178
 
            return paramiko.SFTPServer.convert_errno(e.errno)
179
 
 
180
 
        if (flags & os.O_CREAT) and (attr is not None):
181
 
            attr._flags &= ~attr.FLAG_PERMISSIONS
182
 
            paramiko.SFTPServer.set_file_attr(path, attr)
183
 
        if flags & os.O_WRONLY:
184
 
            fstr = 'wb'
185
 
        elif flags & os.O_RDWR:
186
 
            fstr = 'rb+'
187
 
        else:
188
 
            # O_RDONLY (== 0)
189
 
            fstr = 'rb'
190
 
        try:
191
 
            f = os.fdopen(fd, fstr)
192
 
        except (IOError, OSError), e:
193
 
            return paramiko.SFTPServer.convert_errno(e.errno)
194
 
        fobj = StubSFTPHandle()
195
 
        fobj.filename = path
196
 
        fobj.readfile = f
197
 
        fobj.writefile = f
198
 
        return fobj
199
 
 
200
 
    def remove(self, path):
201
 
        path = self._realpath(path)
202
 
        try:
203
 
            os.remove(path)
204
 
        except OSError, e:
205
 
            return paramiko.SFTPServer.convert_errno(e.errno)
206
 
        return paramiko.SFTP_OK
207
 
 
208
 
    def rename(self, oldpath, newpath):
209
 
        oldpath = self._realpath(oldpath)
210
 
        newpath = self._realpath(newpath)
211
 
        try:
212
 
            os.rename(oldpath, newpath)
213
 
        except OSError, e:
214
 
            return paramiko.SFTPServer.convert_errno(e.errno)
215
 
        return paramiko.SFTP_OK
216
 
 
217
 
    def mkdir(self, path, attr):
218
 
        path = self._realpath(path)
219
 
        try:
220
 
            # Using getattr() in case st_mode is None or 0
221
 
            # both evaluate to False
222
 
            if getattr(attr, 'st_mode', None):
223
 
                os.mkdir(path, attr.st_mode)
224
 
            else:
225
 
                os.mkdir(path)
226
 
            if attr is not None:
227
 
                attr._flags &= ~attr.FLAG_PERMISSIONS
228
 
                paramiko.SFTPServer.set_file_attr(path, attr)
229
 
        except OSError, e:
230
 
            return paramiko.SFTPServer.convert_errno(e.errno)
231
 
        return paramiko.SFTP_OK
232
 
 
233
 
    def rmdir(self, path):
234
 
        path = self._realpath(path)
235
 
        try:
236
 
            os.rmdir(path)
237
 
        except OSError, e:
238
 
            return paramiko.SFTPServer.convert_errno(e.errno)
239
 
        return paramiko.SFTP_OK
240
 
 
241
 
    # removed: chattr, symlink, readlink
242
 
    # (nothing in bzr's sftp transport uses those)
243
 
 
244
 
# ------------- server test implementation --------------
245
 
 
246
 
STUB_SERVER_KEY = """
247
 
-----BEGIN RSA PRIVATE KEY-----
248
 
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
249
 
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
250
 
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
251
 
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
252
 
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
253
 
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
254
 
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
255
 
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
256
 
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
257
 
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
258
 
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
259
 
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
260
 
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
261
 
-----END RSA PRIVATE KEY-----
262
 
"""
263
 
 
264
 
 
265
 
class SocketListener(threading.Thread):
266
 
 
267
 
    def __init__(self, callback):
268
 
        threading.Thread.__init__(self)
269
 
        self._callback = callback
270
 
        self._socket = socket.socket()
271
 
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
272
 
        self._socket.bind(('localhost', 0))
273
 
        self._socket.listen(1)
274
 
        self.host, self.port = self._socket.getsockname()[:2]
275
 
        self._stop_event = threading.Event()
276
 
 
277
 
    def stop(self):
278
 
        # called from outside this thread
279
 
        self._stop_event.set()
280
 
        # use a timeout here, because if the test fails, the server thread may
281
 
        # never notice the stop_event.
282
 
        self.join(5.0)
283
 
        self._socket.close()
284
 
 
285
 
    def run(self):
286
 
        while True:
287
 
            readable, writable_unused, exception_unused = \
288
 
                select.select([self._socket], [], [], 0.1)
289
 
            if self._stop_event.isSet():
290
 
                return
291
 
            if len(readable) == 0:
292
 
                continue
293
 
            try:
294
 
                s, addr_unused = self._socket.accept()
295
 
                # because the loopback socket is inline, and transports are
296
 
                # never explicitly closed, best to launch a new thread.
297
 
                threading.Thread(target=self._callback, args=(s,)).start()
298
 
            except socket.error, x:
299
 
                sys.excepthook(*sys.exc_info())
300
 
                warning('Socket error during accept() within unit test server'
301
 
                        ' thread: %r' % x)
302
 
            except Exception, x:
303
 
                # probably a failed test; unit test thread will log the
304
 
                # failure/error
305
 
                sys.excepthook(*sys.exc_info())
306
 
                warning('Exception from within unit test server thread: %r' %
307
 
                        x)
308
 
 
309
 
 
310
 
class SocketDelay(object):
311
 
    """A socket decorator to make TCP appear slower.
312
 
 
313
 
    This changes recv, send, and sendall to add a fixed latency to each python
314
 
    call if a new roundtrip is detected. That is, when a recv is called and the
315
 
    flag new_roundtrip is set, latency is charged. Every send and send_all
316
 
    sets this flag.
317
 
 
318
 
    In addition every send, sendall and recv sleeps a bit per character send to
319
 
    simulate bandwidth.
320
 
 
321
 
    Not all methods are implemented, this is deliberate as this class is not a
322
 
    replacement for the builtin sockets layer. fileno is not implemented to
323
 
    prevent the proxy being bypassed.
324
 
    """
325
 
 
326
 
    simulated_time = 0
327
 
    _proxied_arguments = dict.fromkeys([
328
 
        "close", "getpeername", "getsockname", "getsockopt", "gettimeout",
329
 
        "setblocking", "setsockopt", "settimeout", "shutdown"])
330
 
 
331
 
    def __init__(self, sock, latency, bandwidth=1.0,
332
 
                 really_sleep=True):
333
 
        """
334
 
        :param bandwith: simulated bandwith (MegaBit)
335
 
        :param really_sleep: If set to false, the SocketDelay will just
336
 
        increase a counter, instead of calling time.sleep. This is useful for
337
 
        unittesting the SocketDelay.
338
 
        """
339
 
        self.sock = sock
340
 
        self.latency = latency
341
 
        self.really_sleep = really_sleep
342
 
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024)
343
 
        self.new_roundtrip = False
344
 
 
345
 
    def sleep(self, s):
346
 
        if self.really_sleep:
347
 
            time.sleep(s)
348
 
        else:
349
 
            SocketDelay.simulated_time += s
350
 
 
351
 
    def __getattr__(self, attr):
352
 
        if attr in SocketDelay._proxied_arguments:
353
 
            return getattr(self.sock, attr)
354
 
        raise AttributeError("'SocketDelay' object has no attribute %r" %
355
 
                             attr)
356
 
 
357
 
    def dup(self):
358
 
        return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
359
 
                           self._sleep)
360
 
 
361
 
    def recv(self, *args):
362
 
        data = self.sock.recv(*args)
363
 
        if data and self.new_roundtrip:
364
 
            self.new_roundtrip = False
365
 
            self.sleep(self.latency)
366
 
        self.sleep(len(data) * self.time_per_byte)
367
 
        return data
368
 
 
369
 
    def sendall(self, data, flags=0):
370
 
        if not self.new_roundtrip:
371
 
            self.new_roundtrip = True
372
 
            self.sleep(self.latency)
373
 
        self.sleep(len(data) * self.time_per_byte)
374
 
        return self.sock.sendall(data, flags)
375
 
 
376
 
    def send(self, data, flags=0):
377
 
        if not self.new_roundtrip:
378
 
            self.new_roundtrip = True
379
 
            self.sleep(self.latency)
380
 
        bytes_sent = self.sock.send(data, flags)
381
 
        self.sleep(bytes_sent * self.time_per_byte)
382
 
        return bytes_sent
383
 
 
384
 
 
385
 
class SFTPServer(test_server.TestServer):
386
 
    """Common code for SFTP server facilities."""
387
 
 
388
 
    def __init__(self, server_interface=StubServer):
389
 
        self._original_vendor = None
390
 
        self._homedir = None
391
 
        self._server_homedir = None
392
 
        self._listener = None
393
 
        self._root = None
394
 
        self._vendor = ssh.ParamikoVendor()
395
 
        self._server_interface = server_interface
396
 
        # sftp server logs
397
 
        self.logs = []
398
 
        self.add_latency = 0
399
 
 
400
 
    def _get_sftp_url(self, path):
401
 
        """Calculate an sftp url to this server for path."""
402
 
        return 'sftp://foo:bar@%s:%d/%s' % (self._listener.host,
403
 
                                            self._listener.port, path)
404
 
 
405
 
    def log(self, message):
406
 
        """StubServer uses this to log when a new server is created."""
407
 
        self.logs.append(message)
408
 
 
409
 
    def _run_server_entry(self, sock):
410
 
        """Entry point for all implementations of _run_server.
411
 
 
412
 
        If self.add_latency is > 0.000001 then sock is given a latency adding
413
 
        decorator.
414
 
        """
415
 
        if self.add_latency > 0.000001:
416
 
            sock = SocketDelay(sock, self.add_latency)
417
 
        return self._run_server(sock)
418
 
 
419
 
    def _run_server(self, s):
420
 
        ssh_server = paramiko.Transport(s)
421
 
        key_file = osutils.pathjoin(self._homedir, 'test_rsa.key')
422
 
        f = open(key_file, 'w')
423
 
        f.write(STUB_SERVER_KEY)
424
 
        f.close()
425
 
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
426
 
        ssh_server.add_server_key(host_key)
427
 
        server = self._server_interface(self)
428
 
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
429
 
                                         StubSFTPServer, root=self._root,
430
 
                                         home=self._server_homedir)
431
 
        event = threading.Event()
432
 
        ssh_server.start_server(event, server)
433
 
        event.wait(5.0)
434
 
 
435
 
    def start_server(self, backing_server=None):
436
 
        # XXX: TODO: make sftpserver back onto backing_server rather than local
437
 
        # disk.
438
 
        if not (backing_server is None or
439
 
                isinstance(backing_server, test_server.LocalURLServer)):
440
 
            raise AssertionError(
441
 
                "backing_server should not be %r, because this can only serve the "
442
 
                "local current working directory." % (backing_server,))
443
 
        self._original_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
444
 
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._vendor
445
 
        if sys.platform == 'win32':
446
 
            # Win32 needs to use the UNICODE api
447
 
            self._homedir = getcwd()
448
 
        else:
449
 
            # But Linux SFTP servers should just deal in bytestreams
450
 
            self._homedir = os.getcwd()
451
 
        if self._server_homedir is None:
452
 
            self._server_homedir = self._homedir
453
 
        self._root = '/'
454
 
        if sys.platform == 'win32':
455
 
            self._root = ''
456
 
        self._listener = SocketListener(self._run_server_entry)
457
 
        self._listener.setDaemon(True)
458
 
        self._listener.start()
459
 
 
460
 
    def stop_server(self):
461
 
        self._listener.stop()
462
 
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._original_vendor
463
 
 
464
 
    def get_bogus_url(self):
465
 
        """See bzrlib.transport.Server.get_bogus_url."""
466
 
        # this is chosen to try to prevent trouble with proxies, wierd dns, etc
467
 
        # we bind a random socket, so that we get a guaranteed unused port
468
 
        # we just never listen on that port
469
 
        s = socket.socket()
470
 
        s.bind(('localhost', 0))
471
 
        return 'sftp://%s:%s/' % s.getsockname()
472
 
 
473
 
 
474
 
class SFTPFullAbsoluteServer(SFTPServer):
475
 
    """A test server for sftp transports, using absolute urls and ssh."""
476
 
 
477
 
    def get_url(self):
478
 
        """See bzrlib.transport.Server.get_url."""
479
 
        homedir = self._homedir
480
 
        if sys.platform != 'win32':
481
 
            # Remove the initial '/' on all platforms but win32
482
 
            homedir = homedir[1:]
483
 
        return self._get_sftp_url(urlutils.escape(homedir))
484
 
 
485
 
 
486
 
class SFTPServerWithoutSSH(SFTPServer):
487
 
    """An SFTP server that uses a simple TCP socket pair rather than SSH."""
488
 
 
489
 
    def __init__(self):
490
 
        super(SFTPServerWithoutSSH, self).__init__()
491
 
        self._vendor = ssh.LoopbackVendor()
492
 
 
493
 
    def _run_server(self, sock):
494
 
        # Re-import these as locals, so that they're still accessible during
495
 
        # interpreter shutdown (when all module globals get set to None, leading
496
 
        # to confusing errors like "'NoneType' object has no attribute 'error'".
497
 
        class FakeChannel(object):
498
 
            def get_transport(self):
499
 
                return self
500
 
            def get_log_channel(self):
501
 
                return 'paramiko'
502
 
            def get_name(self):
503
 
                return '1'
504
 
            def get_hexdump(self):
505
 
                return False
506
 
            def close(self):
507
 
                pass
508
 
 
509
 
        server = paramiko.SFTPServer(
510
 
            FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
511
 
            root=self._root, home=self._server_homedir)
512
 
        try:
513
 
            server.start_subsystem(
514
 
                'sftp', None, ssh.SocketAsChannelAdapter(sock))
515
 
        except socket.error, e:
516
 
            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
517
 
                # it's okay for the client to disconnect abruptly
518
 
                # (bug in paramiko 1.6: it should absorb this exception)
519
 
                pass
520
 
            else:
521
 
                raise
522
 
        except Exception, e:
523
 
            # This typically seems to happen during interpreter shutdown, so
524
 
            # most of the useful ways to report this error are won't work.
525
 
            # Writing the exception type, and then the text of the exception,
526
 
            # seems to be the best we can do.
527
 
            import sys
528
 
            sys.stderr.write('\nEXCEPTION %r: ' % (e.__class__,))
529
 
            sys.stderr.write('%s\n\n' % (e,))
530
 
        server.finish_subsystem()
531
 
 
532
 
 
533
 
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
534
 
    """A test server for sftp transports, using absolute urls."""
535
 
 
536
 
    def get_url(self):
537
 
        """See bzrlib.transport.Server.get_url."""
538
 
        homedir = self._homedir
539
 
        if sys.platform != 'win32':
540
 
            # Remove the initial '/' on all platforms but win32
541
 
            homedir = homedir[1:]
542
 
        return self._get_sftp_url(urlutils.escape(homedir))
543
 
 
544
 
 
545
 
class SFTPHomeDirServer(SFTPServerWithoutSSH):
546
 
    """A test server for sftp transports, using homedir relative urls."""
547
 
 
548
 
    def get_url(self):
549
 
        """See bzrlib.transport.Server.get_url."""
550
 
        return self._get_sftp_url("~/")
551
 
 
552
 
 
553
 
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
554
 
    """A test server for sftp transports where only absolute paths will work.
555
 
 
556
 
    It does this by serving from a deeply-nested directory that doesn't exist.
557
 
    """
558
 
 
559
 
    def start_server(self, backing_server=None):
560
 
        self._server_homedir = '/dev/noone/runs/tests/here'
561
 
        super(SFTPSiblingAbsoluteServer, self).start_server(backing_server)
562