~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testsftp.py

  • Committer: Martin Pool
  • Date: 2005-11-04 01:46:31 UTC
  • mto: (1185.33.49 bzr.dev)
  • mto: This revision was merged to the branch mainline in revision 1512.
  • Revision ID: mbp@sourcefrog.net-20051104014631-750e0ad4172c952c
Make biobench directly executable

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
2
 
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
 
#
 
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
 
2
 
4
3
# This program is free software; you can redistribute it and/or modify
5
4
# it under the terms of the GNU General Public License as published by
6
5
# the Free Software Foundation; either version 2 of the License, or
7
6
# (at your option) any later version.
8
 
#
 
7
 
9
8
# This program is distributed in the hope that it will be useful,
10
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
11
# GNU General Public License for more details.
13
 
#
 
12
 
14
13
# You should have received a copy of the GNU General Public License
15
14
# along with this program; if not, write to the Free Software
16
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17
16
 
18
17
import os
19
18
import socket
20
 
import sys
21
19
import threading
22
 
import time
 
20
import unittest
 
21
 
 
22
from bzrlib.selftest import TestCaseInTempDir
 
23
from bzrlib.selftest.testtransport import TestTransportMixIn
23
24
 
24
25
try:
25
26
    import paramiko
 
27
    from stub_sftp import StubServer, StubSFTPServer
26
28
    paramiko_loaded = True
27
29
except ImportError:
28
30
    paramiko_loaded = False
29
31
 
30
 
from bzrlib import (
31
 
    bzrdir,
32
 
    errors,
33
 
    )
34
 
from bzrlib.osutils import (
35
 
    pathjoin,
36
 
    lexists,
37
 
    set_or_unset_env,
38
 
    )
39
 
from bzrlib.tests import (
40
 
    TestCaseWithTransport,
41
 
    TestCase,
42
 
    TestSkipped,
43
 
    )
44
 
from bzrlib.tests.http_server import HttpServer
45
 
from bzrlib.transport import get_transport
46
 
import bzrlib.transport.http
47
 
 
48
 
if paramiko_loaded:
49
 
    from bzrlib.transport.sftp import (
50
 
        SFTPAbsoluteServer,
51
 
        SFTPHomeDirServer,
52
 
        SFTPTransport,
53
 
        )
54
 
 
55
 
from bzrlib.workingtree import WorkingTree
56
 
 
57
 
 
58
 
def set_test_transport_to_sftp(testcase):
59
 
    """A helper to set transports on test case instances."""
60
 
    if getattr(testcase, '_get_remote_is_absolute', None) is None:
61
 
        testcase._get_remote_is_absolute = True
62
 
    if testcase._get_remote_is_absolute:
63
 
        testcase.transport_server = SFTPAbsoluteServer
64
 
    else:
65
 
        testcase.transport_server = SFTPHomeDirServer
66
 
    testcase.transport_readonly_server = HttpServer
67
 
 
68
 
 
69
 
class TestCaseWithSFTPServer(TestCaseWithTransport):
70
 
    """A test case base class that provides a sftp server on localhost."""
71
 
 
72
 
    def setUp(self):
73
 
        super(TestCaseWithSFTPServer, self).setUp()
74
 
        if not paramiko_loaded:
75
 
            raise TestSkipped('you must have paramiko to run this test')
76
 
        set_test_transport_to_sftp(self)
77
 
 
78
 
 
79
 
class SFTPLockTests (TestCaseWithSFTPServer):
80
 
 
81
 
    def test_sftp_locks(self):
82
 
        from bzrlib.errors import LockError
83
 
        t = self.get_transport()
84
 
 
85
 
        l = t.lock_write('bogus')
86
 
        self.failUnlessExists('bogus.write-lock')
87
 
 
88
 
        # Don't wait for the lock, locking an already locked
89
 
        # file should raise an assert
90
 
        self.assertRaises(LockError, t.lock_write, 'bogus')
91
 
 
92
 
        l.unlock()
93
 
        self.failIf(lexists('bogus.write-lock'))
94
 
 
95
 
        open('something.write-lock', 'wb').write('fake lock\n')
96
 
        self.assertRaises(LockError, t.lock_write, 'something')
97
 
        os.remove('something.write-lock')
98
 
 
99
 
        l = t.lock_write('something')
100
 
 
101
 
        l2 = t.lock_write('bogus')
102
 
 
103
 
        l.unlock()
104
 
        l2.unlock()
105
 
 
106
 
 
107
 
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
108
 
    """Test the SFTP transport with homedir based relative paths."""
109
 
 
110
 
    def test__remote_path(self):
111
 
        if sys.platform == 'darwin':
112
 
            # This test is about sftp absolute path handling. There is already
113
 
            # (in this test) a TODO about windows needing an absolute path
114
 
            # without drive letter. To me, using self.test_dir is a trick to
115
 
            # get an absolute path for comparison purposes.  That fails for OSX
116
 
            # because the sftp server doesn't resolve the links (and it doesn't
117
 
            # have to). --vila 20070924
118
 
            self.knownFailure('Mac OSX symlinks /tmp to /private/tmp,'
119
 
                              ' testing against self.test_dir'
120
 
                              ' is not appropriate')
121
 
        t = self.get_transport()
122
 
        # This test require unix-like absolute path
123
 
        test_dir = self.test_dir
124
 
        if sys.platform == 'win32':
125
 
            # using hack suggested by John Meinel.
126
 
            # TODO: write another mock server for this test
127
 
            #       and use absolute path without drive letter
128
 
            test_dir = '/' + test_dir
129
 
        # try what is currently used:
130
 
        # remote path = self._abspath(relpath)
131
 
        self.assertIsSameRealPath(test_dir + '/relative',
132
 
                                  t._remote_path('relative'))
133
 
        # we dont os.path.join because windows gives us the wrong path
134
 
        root_segments = test_dir.split('/')
135
 
        root_parent = '/'.join(root_segments[:-1])
136
 
        # .. should be honoured
137
 
        self.assertIsSameRealPath(root_parent + '/sibling',
138
 
                                  t._remote_path('../sibling'))
139
 
        # /  should be illegal ?
140
 
        ### FIXME decide and then test for all transports. RBC20051208
141
 
 
142
 
 
143
 
class SFTPTransportTestRelativeRoot(TestCaseWithSFTPServer):
144
 
    """Test the SFTP transport with homedir based relative paths."""
145
 
 
146
 
    def setUp(self):
147
 
        # Only SFTPHomeDirServer is tested here
148
 
        self._get_remote_is_absolute = False
149
 
        super(SFTPTransportTestRelativeRoot, self).setUp()
150
 
 
151
 
    def test__remote_path_relative_root(self):
152
 
        # relative paths are preserved
153
 
        t = self.get_transport('')
154
 
        self.assertEqual('/~/', t._path)
155
 
        # the remote path should be relative to home dir
156
 
        # (i.e. not begining with a '/')
157
 
        self.assertEqual('a', t._remote_path('a'))
158
 
 
159
 
 
160
 
class SFTPNonServerTest(TestCase):
161
 
    def setUp(self):
162
 
        TestCase.setUp(self)
163
 
        if not paramiko_loaded:
164
 
            raise TestSkipped('you must have paramiko to run this test')
165
 
 
166
 
    def test_parse_url_with_home_dir(self):
167
 
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/~/relative')
168
 
        self.assertEquals(s._host, 'example.com')
169
 
        self.assertEquals(s._port, 2222)
170
 
        self.assertEquals(s._user, 'robey')
171
 
        self.assertEquals(s._password, 'h@t')
172
 
        self.assertEquals(s._path, '/~/relative/')
173
 
 
174
 
    def test_relpath(self):
175
 
        s = SFTPTransport('sftp://user@host.com/abs/path')
176
 
        self.assertRaises(errors.PathNotChild, s.relpath,
177
 
                          'sftp://user@host.com/~/rel/path/sub')
178
 
 
179
 
    def test_get_paramiko_vendor(self):
180
 
        """Test that if no 'ssh' is available we get builtin paramiko"""
181
 
        from bzrlib.transport import ssh
182
 
        # set '.' as the only location in the path, forcing no 'ssh' to exist
183
 
        orig_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
184
 
        orig_path = set_or_unset_env('PATH', '.')
185
 
        try:
186
 
            # No vendor defined yet, query for one
187
 
            ssh._ssh_vendor_manager.clear_cache()
188
 
            vendor = ssh._get_ssh_vendor()
189
 
            self.assertIsInstance(vendor, ssh.ParamikoVendor)
190
 
        finally:
191
 
            set_or_unset_env('PATH', orig_path)
192
 
            ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
193
 
 
194
 
    def test_abspath_root_sibling_server(self):
195
 
        from bzrlib.transport.sftp import SFTPSiblingAbsoluteServer
196
 
        server = SFTPSiblingAbsoluteServer()
197
 
        server.setUp()
198
 
        try:
199
 
            transport = get_transport(server.get_url())
200
 
            self.assertFalse(transport.abspath('/').endswith('/~/'))
201
 
            self.assertTrue(transport.abspath('/').endswith('/'))
202
 
            del transport
203
 
        finally:
204
 
            server.tearDown()
205
 
 
206
 
 
207
 
class SFTPBranchTest(TestCaseWithSFTPServer):
208
 
    """Test some stuff when accessing a bzr Branch over sftp"""
209
 
 
210
 
    def test_lock_file(self):
211
 
        # old format branches use a special lock file on sftp.
212
 
        b = self.make_branch('', format=bzrdir.BzrDirFormat6())
213
 
        b = bzrlib.branch.Branch.open(self.get_url())
214
 
        self.failUnlessExists('.bzr/')
215
 
        self.failUnlessExists('.bzr/branch-format')
216
 
        self.failUnlessExists('.bzr/branch-lock')
217
 
 
218
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
219
 
        b.lock_write()
220
 
        self.failUnlessExists('.bzr/branch-lock.write-lock')
221
 
        b.unlock()
222
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
223
 
 
224
 
    def test_push_support(self):
225
 
        self.build_tree(['a/', 'a/foo'])
226
 
        t = bzrdir.BzrDir.create_standalone_workingtree('a')
227
 
        b = t.branch
228
 
        t.add('foo')
229
 
        t.commit('foo', rev_id='a1')
230
 
 
231
 
        b2 = bzrdir.BzrDir.create_branch_and_repo(self.get_url('/b'))
232
 
        b2.pull(b)
233
 
 
234
 
        self.assertEquals(b2.revision_history(), ['a1'])
235
 
 
236
 
        open('a/foo', 'wt').write('something new in foo\n')
237
 
        t.commit('new', rev_id='a2')
238
 
        b2.pull(b)
239
 
 
240
 
        self.assertEquals(b2.revision_history(), ['a1', 'a2'])
241
 
 
242
 
 
243
 
class SSHVendorConnection(TestCaseWithSFTPServer):
244
 
    """Test that the ssh vendors can all connect.
245
 
 
246
 
    Verify that a full-handshake (SSH over loopback TCP) sftp connection works.
247
 
 
248
 
    We have 3 sftp implementations in the test suite:
249
 
      'loopback': Doesn't use ssh, just uses a local socket. Most tests are
250
 
                  done this way to save the handshaking time, so it is not
251
 
                  tested again here
252
 
      'none':     This uses paramiko's built-in ssh client and server, and layers
253
 
                  sftp on top of it.
254
 
      None:       If 'ssh' exists on the machine, then it will be spawned as a
255
 
                  child process.
256
 
    """
257
 
    
258
 
    def setUp(self):
259
 
        super(SSHVendorConnection, self).setUp()
260
 
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
261
 
 
262
 
        def create_server():
263
 
            """Just a wrapper so that when created, it will set _vendor"""
264
 
            # SFTPFullAbsoluteServer can handle any vendor,
265
 
            # it just needs to be set between the time it is instantiated
266
 
            # and the time .setUp() is called
267
 
            server = SFTPFullAbsoluteServer()
268
 
            server._vendor = self._test_vendor
269
 
            return server
270
 
        self._test_vendor = 'loopback'
271
 
        self.vfs_transport_server = create_server
272
 
        f = open('a_file', 'wb')
273
 
        try:
274
 
            f.write('foobar\n')
275
 
        finally:
276
 
            f.close()
277
 
 
278
 
    def set_vendor(self, vendor):
279
 
        self._test_vendor = vendor
280
 
 
281
 
    def test_connection_paramiko(self):
282
 
        from bzrlib.transport import ssh
283
 
        self.set_vendor(ssh.ParamikoVendor())
284
 
        t = self.get_transport()
285
 
        self.assertEqual('foobar\n', t.get('a_file').read())
286
 
 
287
 
    def test_connection_vendor(self):
288
 
        raise TestSkipped("We don't test spawning real ssh,"
289
 
                          " because it prompts for a password."
290
 
                          " Enable this test if we figure out"
291
 
                          " how to prevent this.")
292
 
        self.set_vendor(None)
293
 
        t = self.get_transport()
294
 
        self.assertEqual('foobar\n', t.get('a_file').read())
295
 
 
296
 
 
297
 
class SSHVendorBadConnection(TestCaseWithTransport):
298
 
    """Test that the ssh vendors handle bad connection properly
299
 
 
300
 
    We don't subclass TestCaseWithSFTPServer, because we don't actually
301
 
    need an SFTP connection.
302
 
    """
303
 
 
304
 
    def setUp(self):
305
 
        if not paramiko_loaded:
306
 
            raise TestSkipped('you must have paramiko to run this test')
307
 
        super(SSHVendorBadConnection, self).setUp()
308
 
        import bzrlib.transport.ssh
309
 
 
310
 
        # open a random port, so we know nobody else is using it
311
 
        # but don't actually listen on the port.
312
 
        s = socket.socket()
313
 
        s.bind(('localhost', 0))
314
 
        self.bogus_url = 'sftp://%s:%s/' % s.getsockname()
315
 
 
316
 
        orig_vendor = bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor
317
 
        def reset():
318
 
            bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
319
 
            s.close()
320
 
        self.addCleanup(reset)
321
 
 
322
 
    def set_vendor(self, vendor):
323
 
        import bzrlib.transport.ssh
324
 
        bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = vendor
325
 
 
326
 
    def test_bad_connection_paramiko(self):
327
 
        """Test that a real connection attempt raises the right error"""
328
 
        from bzrlib.transport import ssh
329
 
        self.set_vendor(ssh.ParamikoVendor())
330
 
        t = bzrlib.transport.get_transport(self.bogus_url)
331
 
        self.assertRaises(errors.ConnectionError, t.get, 'foobar')
332
 
 
333
 
    def test_bad_connection_ssh(self):
334
 
        """None => auto-detect vendor"""
335
 
        self.set_vendor(None)
336
 
        # This is how I would normally test the connection code
337
 
        # it makes it very clear what we are testing.
338
 
        # However, 'ssh' will create stipple on the output, so instead
339
 
        # I'm using run_bzr_subprocess, and parsing the output
340
 
        # try:
341
 
        #     t = bzrlib.transport.get_transport(self.bogus_url)
342
 
        # except errors.ConnectionError:
343
 
        #     # Correct error
344
 
        #     pass
345
 
        # except errors.NameError, e:
346
 
        #     if 'SSHException' in str(e):
347
 
        #         raise TestSkipped('Known NameError bug in paramiko 1.6.1')
348
 
        #     raise
349
 
        # else:
350
 
        #     self.fail('Excepted ConnectionError to be raised')
351
 
 
352
 
        out, err = self.run_bzr_subprocess(['log', self.bogus_url], retcode=3)
353
 
        self.assertEqual('', out)
354
 
        if "NameError: global name 'SSHException'" in err:
355
 
            # We aren't fixing this bug, because it is a bug in
356
 
            # paramiko, but we know about it, so we don't have to
357
 
            # fail the test
358
 
            raise TestSkipped('Known NameError bug with paramiko-1.6.1')
359
 
        self.assertContainsRe(err, r'bzr: ERROR: Unable to connect to SSH host'
360
 
                                   r' 127\.0\.0\.1:\d+; ')
361
 
 
362
 
 
363
 
class SFTPLatencyKnob(TestCaseWithSFTPServer):
364
 
    """Test that the testing SFTPServer's latency knob works."""
365
 
 
366
 
    def test_latency_knob_slows_transport(self):
367
 
        # change the latency knob to 500ms. We take about 40ms for a 
368
 
        # loopback connection ordinarily.
369
 
        start_time = time.time()
370
 
        self.get_server().add_latency = 0.5
371
 
        transport = self.get_transport()
372
 
        transport.has('not me') # Force connection by issuing a request
373
 
        with_latency_knob_time = time.time() - start_time
374
 
        self.assertTrue(with_latency_knob_time > 0.4)
375
 
 
376
 
    def test_default(self):
377
 
        # This test is potentially brittle: under extremely high machine load
378
 
        # it could fail, but that is quite unlikely
379
 
        raise TestSkipped('Timing-sensitive test')
380
 
        start_time = time.time()
381
 
        transport = self.get_transport()
382
 
        transport.has('not me') # Force connection by issuing a request
383
 
        regular_time = time.time() - start_time
384
 
        self.assertTrue(regular_time < 0.5)
385
 
 
386
 
 
387
 
class FakeSocket(object):
388
 
    """Fake socket object used to test the SocketDelay wrapper without
389
 
    using a real socket.
390
 
    """
391
 
 
392
 
    def __init__(self):
393
 
        self._data = ""
394
 
 
395
 
    def send(self, data, flags=0):
396
 
        self._data += data
397
 
        return len(data)
398
 
 
399
 
    def sendall(self, data, flags=0):
400
 
        self._data += data
401
 
        return len(data)
402
 
 
403
 
    def recv(self, size, flags=0):
404
 
        if size < len(self._data):
405
 
            result = self._data[:size]
406
 
            self._data = self._data[size:]
407
 
            return result
408
 
        else:
409
 
            result = self._data
410
 
            self._data = ""
411
 
            return result
412
 
 
413
 
 
414
 
class TestSocketDelay(TestCase):
415
 
 
416
 
    def setUp(self):
417
 
        TestCase.setUp(self)
418
 
        if not paramiko_loaded:
419
 
            raise TestSkipped('you must have paramiko to run this test')
420
 
 
421
 
    def test_delay(self):
422
 
        from bzrlib.transport.sftp import SocketDelay
423
 
        sending = FakeSocket()
424
 
        receiving = SocketDelay(sending, 0.1, bandwidth=1000000,
425
 
                                really_sleep=False)
426
 
        # check that simulated time is charged only per round-trip:
427
 
        t1 = SocketDelay.simulated_time
428
 
        receiving.send("connect1")
429
 
        self.assertEqual(sending.recv(1024), "connect1")
430
 
        t2 = SocketDelay.simulated_time
431
 
        self.assertAlmostEqual(t2 - t1, 0.1)
432
 
        receiving.send("connect2")
433
 
        self.assertEqual(sending.recv(1024), "connect2")
434
 
        sending.send("hello")
435
 
        self.assertEqual(receiving.recv(1024), "hello")
436
 
        t3 = SocketDelay.simulated_time
437
 
        self.assertAlmostEqual(t3 - t2, 0.1)
438
 
        sending.send("hello")
439
 
        self.assertEqual(receiving.recv(1024), "hello")
440
 
        sending.send("hello")
441
 
        self.assertEqual(receiving.recv(1024), "hello")
442
 
        sending.send("hello")
443
 
        self.assertEqual(receiving.recv(1024), "hello")
444
 
        t4 = SocketDelay.simulated_time
445
 
        self.assertAlmostEqual(t4, t3)
446
 
 
447
 
    def test_bandwidth(self):
448
 
        from bzrlib.transport.sftp import SocketDelay
449
 
        sending = FakeSocket()
450
 
        receiving = SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
451
 
                                really_sleep=False)
452
 
        # check that simulated time is charged only per round-trip:
453
 
        t1 = SocketDelay.simulated_time
454
 
        receiving.send("connect")
455
 
        self.assertEqual(sending.recv(1024), "connect")
456
 
        sending.send("a" * 100)
457
 
        self.assertEqual(receiving.recv(1024), "a" * 100)
458
 
        t2 = SocketDelay.simulated_time
459
 
        self.assertAlmostEqual(t2 - t1, 100 + 7)
460
 
 
461
 
 
 
32
 
 
33
STUB_SERVER_KEY = """
 
34
-----BEGIN RSA PRIVATE KEY-----
 
35
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
 
36
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
 
37
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
 
38
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
 
39
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
 
40
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
 
41
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
 
42
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
 
43
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
 
44
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
 
45
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
 
46
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
 
47
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
 
48
-----END RSA PRIVATE KEY-----
 
49
"""
 
50
    
 
51
 
 
52
class SingleListener (threading.Thread):
 
53
    def __init__(self, callback):
 
54
        threading.Thread.__init__(self)
 
55
        self._callback = callback
 
56
        self._socket = socket.socket()
 
57
        self._socket.listen(1)
 
58
        self.port = self._socket.getsockname()[1]
 
59
        self.stop_event = threading.Event()
 
60
 
 
61
    def run(self):
 
62
        s, _ = self._socket.accept()
 
63
        # now close the listen socket
 
64
        self._socket.close()
 
65
        self._callback(s, self.stop_event)
 
66
    
 
67
    def stop(self):
 
68
        self.stop_event.set()
 
69
        
 
70
        
 
71
class TestCaseWithSFTPServer (TestCaseInTempDir):
 
72
    """
 
73
    Execute a test case with a stub SFTP server, serving files from the local
 
74
    filesystem over the loopback network.
 
75
    """
 
76
    
 
77
    def _run_server(self, s, stop_event):
 
78
        ssh_server = paramiko.Transport(s)
 
79
        key_file = os.path.join(self._root, 'test_rsa.key')
 
80
        file(key_file, 'w').write(STUB_SERVER_KEY)
 
81
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
82
        ssh_server.add_server_key(host_key)
 
83
        server = StubServer()
 
84
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
 
85
        event = threading.Event()
 
86
        ssh_server.start_server(event, server)
 
87
        event.wait(5.0)
 
88
        stop_event.wait(30.0)
 
89
 
 
90
    def setUp(self):
 
91
        TestCaseInTempDir.setUp(self)
 
92
        self._root = self.test_dir
 
93
 
 
94
        self._listener = SingleListener(self._run_server)
 
95
        self._listener.setDaemon(True)
 
96
        self._listener.start()        
 
97
        self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
 
98
        
 
99
    def tearDown(self):
 
100
        self._listener.stop()
 
101
        TestCaseInTempDir.tearDown(self)
 
102
 
 
103
        
 
104
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
 
105
    readonly = False
 
106
 
 
107
    def get_transport(self):
 
108
        from bzrlib.transport.sftp import SFTPTransport
 
109
        url = self._sftp_url
 
110
        return SFTPTransport(url)
 
111
 
 
112
if not paramiko_loaded:
 
113
    del SFTPTransportTest