~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testsftp.py

Exclude more files from dumb-rsync upload

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
2
 
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
 
#
 
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
 
2
 
4
3
# This program is free software; you can redistribute it and/or modify
5
4
# it under the terms of the GNU General Public License as published by
6
5
# the Free Software Foundation; either version 2 of the License, or
7
6
# (at your option) any later version.
8
 
#
 
7
 
9
8
# This program is distributed in the hope that it will be useful,
10
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
11
# GNU General Public License for more details.
13
 
#
 
12
 
14
13
# You should have received a copy of the GNU General Public License
15
14
# along with this program; if not, write to the Free Software
16
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17
16
 
18
17
import os
19
18
import socket
20
 
import sys
21
19
import threading
22
 
import time
 
20
import unittest
23
21
 
24
 
import bzrlib.bzrdir as bzrdir
25
 
import bzrlib.errors as errors
26
 
from bzrlib.osutils import pathjoin, lexists, set_or_unset_env
27
 
from bzrlib.tests import TestCaseWithTransport, TestCase, TestSkipped
28
 
from bzrlib.tests.HttpServer import HttpServer
29
 
import bzrlib.transport
30
 
from bzrlib.transport import get_transport
31
 
import bzrlib.transport.http
32
 
from bzrlib.workingtree import WorkingTree
 
22
from bzrlib.selftest import TestCaseInTempDir
 
23
from bzrlib.selftest.testtransport import TestTransportMixIn
33
24
 
34
25
try:
35
26
    import paramiko
 
27
    from stub_sftp import StubServer, StubSFTPServer
36
28
    paramiko_loaded = True
37
29
except ImportError:
38
30
    paramiko_loaded = False
39
31
 
40
32
 
41
 
def set_test_transport_to_sftp(testcase):
42
 
    """A helper to set transports on test case instances."""
43
 
    from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
44
 
    if getattr(testcase, '_get_remote_is_absolute', None) is None:
45
 
        testcase._get_remote_is_absolute = True
46
 
    if testcase._get_remote_is_absolute:
47
 
        testcase.transport_server = SFTPAbsoluteServer
48
 
    else:
49
 
        testcase.transport_server = SFTPHomeDirServer
50
 
    testcase.transport_readonly_server = HttpServer
51
 
 
52
 
 
53
 
class TestCaseWithSFTPServer(TestCaseWithTransport):
54
 
    """A test case base class that provides a sftp server on localhost."""
55
 
 
56
 
    def setUp(self):
57
 
        super(TestCaseWithSFTPServer, self).setUp()
58
 
        if not paramiko_loaded:
59
 
            raise TestSkipped('you must have paramiko to run this test')
60
 
        set_test_transport_to_sftp(self)
61
 
 
62
 
    def get_transport(self, path=None):
63
 
        """Return a transport relative to self._test_root."""
64
 
        return bzrlib.transport.get_transport(self.get_url(path))
65
 
 
66
 
 
67
 
class SFTPLockTests (TestCaseWithSFTPServer):
68
 
 
69
 
    def test_sftp_locks(self):
70
 
        from bzrlib.errors import LockError
71
 
        t = self.get_transport()
72
 
 
73
 
        l = t.lock_write('bogus')
74
 
        self.failUnlessExists('bogus.write-lock')
75
 
 
76
 
        # Don't wait for the lock, locking an already locked
77
 
        # file should raise an assert
78
 
        self.assertRaises(LockError, t.lock_write, 'bogus')
79
 
 
80
 
        l.unlock()
81
 
        self.failIf(lexists('bogus.write-lock'))
82
 
 
83
 
        open('something.write-lock', 'wb').write('fake lock\n')
84
 
        self.assertRaises(LockError, t.lock_write, 'something')
85
 
        os.remove('something.write-lock')
86
 
 
87
 
        l = t.lock_write('something')
88
 
 
89
 
        l2 = t.lock_write('bogus')
90
 
 
91
 
        l.unlock()
92
 
        l2.unlock()
93
 
 
94
 
    def test_multiple_connections(self):
95
 
        t = self.get_transport()
96
 
        self.assertTrue('sftpserver - new connection' in self.get_server().logs)
97
 
        self.get_server().logs = []
98
 
        # The second request should reuse the first connection
99
 
        # SingleListener only allows for a single connection,
100
 
        # So the next line fails unless the connection is reused
101
 
        t2 = self.get_transport()
102
 
        self.assertEquals(self.get_server().logs, [])
103
 
 
104
 
 
105
 
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
106
 
    """Test the SFTP transport with homedir based relative paths."""
107
 
 
108
 
    def test__remote_path(self):
109
 
        t = self.get_transport()
110
 
        # This test require unix-like absolute path
111
 
        test_dir = self.test_dir
112
 
        if sys.platform == 'win32':
113
 
            # using hack suggested by John Meinel.
114
 
            # TODO: write another mock server for this test
115
 
            #       and use absolute path without drive letter
116
 
            test_dir = '/' + test_dir
117
 
        # try what is currently used:
118
 
        # remote path = self._abspath(relpath)
119
 
        self.assertEqual(test_dir + '/relative', t._remote_path('relative'))
120
 
        # we dont os.path.join because windows gives us the wrong path
121
 
        root_segments = test_dir.split('/')
122
 
        root_parent = '/'.join(root_segments[:-1])
123
 
        # .. should be honoured
124
 
        self.assertEqual(root_parent + '/sibling', t._remote_path('../sibling'))
125
 
        # /  should be illegal ?
126
 
        ### FIXME decide and then test for all transports. RBC20051208
127
 
 
128
 
 
129
 
class SFTPTransportTestRelativeRoot(TestCaseWithSFTPServer):
130
 
    """Test the SFTP transport with homedir based relative paths."""
131
 
 
132
 
    def setUp(self):
133
 
        self._get_remote_is_absolute = False
134
 
        super(SFTPTransportTestRelativeRoot, self).setUp()
135
 
 
136
 
    def test__remote_path_relative_root(self):
137
 
        # relative paths are preserved
138
 
        t = self.get_transport('')
139
 
        # the remote path should be ''
140
 
        self.assertEqual('', t._path)
141
 
        self.assertEqual('a', t._remote_path('a'))
142
 
 
143
 
 
144
 
class FakeSFTPTransport (object):
145
 
    _sftp = object()
146
 
fake = FakeSFTPTransport()
147
 
 
148
 
 
149
 
class SFTPNonServerTest(TestCase):
150
 
    def setUp(self):
151
 
        TestCase.setUp(self)
152
 
        if not paramiko_loaded:
153
 
            raise TestSkipped('you must have paramiko to run this test')
154
 
 
155
 
    def test_parse_url(self):
156
 
        from bzrlib.transport.sftp import SFTPTransport
157
 
        s = SFTPTransport('sftp://simple.example.com/home/source', clone_from=fake)
158
 
        self.assertEquals(s._host, 'simple.example.com')
159
 
        self.assertEquals(s._port, None)
160
 
        self.assertEquals(s._path, '/home/source')
161
 
        self.failUnless(s._password is None)
162
 
 
163
 
        self.assertEquals(s.base, 'sftp://simple.example.com/home/source/')
164
 
 
165
 
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/~/relative', clone_from=fake)
166
 
        self.assertEquals(s._host, 'example.com')
167
 
        self.assertEquals(s._port, 2222)
168
 
        self.assertEquals(s._username, 'robey')
169
 
        self.assertEquals(s._password, 'h@t')
170
 
        self.assertEquals(s._path, 'relative')
171
 
 
172
 
        # Base should not keep track of the password
173
 
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/~/relative/')
174
 
 
175
 
    def test_relpath(self):
176
 
        from bzrlib.transport.sftp import SFTPTransport
177
 
        from bzrlib.errors import PathNotChild
178
 
 
179
 
        s = SFTPTransport('sftp://user@host.com/abs/path', clone_from=fake)
180
 
        self.assertEquals(s.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
181
 
        # Can't test this one, because we actually get an AssertionError
182
 
        # TODO: Consider raising an exception rather than an assert
183
 
        #self.assertRaises(PathNotChild, s.relpath, 'http://user@host.com/abs/path/sub')
184
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user2@host.com/abs/path/sub')
185
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@otherhost.com/abs/path/sub')
186
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com:33/abs/path/sub')
187
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com/~/rel/path/sub')
188
 
 
189
 
        # Make sure it works when we don't supply a username
190
 
        s = SFTPTransport('sftp://host.com/abs/path', clone_from=fake)
191
 
        self.assertEquals(s.relpath('sftp://host.com/abs/path/sub'), 'sub')
192
 
 
193
 
        # Make sure it works when parts of the path will be url encoded
194
 
        # TODO: These may be incorrect, we might need to urllib.urlencode() before
195
 
        # we pass the paths into the SFTPTransport constructor
196
 
        s = SFTPTransport('sftp://host.com/dev/,path', clone_from=fake)
197
 
        self.assertEquals(s.relpath('sftp://host.com/dev/,path/sub'), 'sub')
198
 
        s = SFTPTransport('sftp://host.com/dev/%path', clone_from=fake)
199
 
        self.assertEquals(s.relpath('sftp://host.com/dev/%path/sub'), 'sub')
200
 
 
201
 
    def test_parse_invalid_url(self):
202
 
        from bzrlib.transport.sftp import SFTPTransport, TransportError
203
 
        try:
204
 
            s = SFTPTransport('sftp://lilypond.org:~janneke/public_html/bzr/gub',
205
 
                              clone_from=fake)
206
 
            self.fail('expected exception not raised')
207
 
        except TransportError, e:
208
 
            self.assertEquals(str(e),
209
 
                    'Transport error: '
210
 
                    'invalid port number ~janneke in url:\n'
211
 
                    'sftp://lilypond.org:~janneke/public_html/bzr/gub ')
212
 
 
213
 
    def test_get_paramiko_vendor(self):
214
 
        """Test that if no 'ssh' is available we get builtin paramiko"""
215
 
        from bzrlib.transport import ssh
216
 
        # set '.' as the only location in the path, forcing no 'ssh' to exist
217
 
        orig_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
218
 
        orig_path = set_or_unset_env('PATH', '.')
219
 
        try:
220
 
            # No vendor defined yet, query for one
221
 
            ssh._ssh_vendor_manager.clear_cache()
222
 
            vendor = ssh._get_ssh_vendor()
223
 
            self.assertIsInstance(vendor, ssh.ParamikoVendor)
224
 
        finally:
225
 
            set_or_unset_env('PATH', orig_path)
226
 
            ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
227
 
 
228
 
    def test_abspath_root_sibling_server(self):
229
 
        from bzrlib.transport.sftp import SFTPSiblingAbsoluteServer
230
 
        server = SFTPSiblingAbsoluteServer()
231
 
        server.setUp()
232
 
        try:
233
 
            transport = get_transport(server.get_url())
234
 
            self.assertFalse(transport.abspath('/').endswith('/~/'))
235
 
            self.assertTrue(transport.abspath('/').endswith('/'))
236
 
            del transport
237
 
        finally:
238
 
            server.tearDown()
239
 
 
240
 
 
241
 
class SFTPBranchTest(TestCaseWithSFTPServer):
242
 
    """Test some stuff when accessing a bzr Branch over sftp"""
243
 
 
244
 
    def test_lock_file(self):
245
 
        # old format branches use a special lock file on sftp.
246
 
        b = self.make_branch('', format=bzrdir.BzrDirFormat6())
247
 
        b = bzrlib.branch.Branch.open(self.get_url())
248
 
        self.failUnlessExists('.bzr/')
249
 
        self.failUnlessExists('.bzr/branch-format')
250
 
        self.failUnlessExists('.bzr/branch-lock')
251
 
 
252
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
253
 
        b.lock_write()
254
 
        self.failUnlessExists('.bzr/branch-lock.write-lock')
255
 
        b.unlock()
256
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
257
 
 
258
 
    def test_push_support(self):
259
 
        self.build_tree(['a/', 'a/foo'])
260
 
        t = bzrdir.BzrDir.create_standalone_workingtree('a')
261
 
        b = t.branch
262
 
        t.add('foo')
263
 
        t.commit('foo', rev_id='a1')
264
 
 
265
 
        b2 = bzrdir.BzrDir.create_branch_and_repo(self.get_url('/b'))
266
 
        b2.pull(b)
267
 
 
268
 
        self.assertEquals(b2.revision_history(), ['a1'])
269
 
 
270
 
        open('a/foo', 'wt').write('something new in foo\n')
271
 
        t.commit('new', rev_id='a2')
272
 
        b2.pull(b)
273
 
 
274
 
        self.assertEquals(b2.revision_history(), ['a1', 'a2'])
275
 
 
276
 
 
277
 
class SSHVendorConnection(TestCaseWithSFTPServer):
278
 
    """Test that the ssh vendors can all connect.
279
 
 
280
 
    Verify that a full-handshake (SSH over loopback TCP) sftp connection works.
281
 
 
282
 
    We have 3 sftp implementations in the test suite:
283
 
      'loopback': Doesn't use ssh, just uses a local socket. Most tests are
284
 
                  done this way to save the handshaking time, so it is not
285
 
                  tested again here
286
 
      'none':     This uses paramiko's built-in ssh client and server, and layers
287
 
                  sftp on top of it.
288
 
      None:       If 'ssh' exists on the machine, then it will be spawned as a
289
 
                  child process.
290
 
    """
291
 
    
292
 
    def setUp(self):
293
 
        super(SSHVendorConnection, self).setUp()
294
 
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
295
 
 
296
 
        def create_server():
297
 
            """Just a wrapper so that when created, it will set _vendor"""
298
 
            # SFTPFullAbsoluteServer can handle any vendor,
299
 
            # it just needs to be set between the time it is instantiated
300
 
            # and the time .setUp() is called
301
 
            server = SFTPFullAbsoluteServer()
302
 
            server._vendor = self._test_vendor
303
 
            return server
304
 
        self._test_vendor = 'loopback'
305
 
        self.vfs_transport_server = create_server
306
 
        f = open('a_file', 'wb')
307
 
        try:
308
 
            f.write('foobar\n')
309
 
        finally:
310
 
            f.close()
311
 
 
312
 
    def set_vendor(self, vendor):
313
 
        self._test_vendor = vendor
314
 
 
315
 
    def test_connection_paramiko(self):
316
 
        from bzrlib.transport import ssh
317
 
        self.set_vendor(ssh.ParamikoVendor())
318
 
        t = self.get_transport()
319
 
        self.assertEqual('foobar\n', t.get('a_file').read())
320
 
 
321
 
    def test_connection_vendor(self):
322
 
        raise TestSkipped("We don't test spawning real ssh,"
323
 
                          " because it prompts for a password."
324
 
                          " Enable this test if we figure out"
325
 
                          " how to prevent this.")
326
 
        self.set_vendor(None)
327
 
        t = self.get_transport()
328
 
        self.assertEqual('foobar\n', t.get('a_file').read())
329
 
 
330
 
 
331
 
class SSHVendorBadConnection(TestCaseWithTransport):
332
 
    """Test that the ssh vendors handle bad connection properly
333
 
 
334
 
    We don't subclass TestCaseWithSFTPServer, because we don't actually
335
 
    need an SFTP connection.
336
 
    """
337
 
 
338
 
    def setUp(self):
339
 
        if not paramiko_loaded:
340
 
            raise TestSkipped('you must have paramiko to run this test')
341
 
        super(SSHVendorBadConnection, self).setUp()
342
 
        import bzrlib.transport.ssh
343
 
 
344
 
        # open a random port, so we know nobody else is using it
345
 
        # but don't actually listen on the port.
346
 
        s = socket.socket()
347
 
        s.bind(('localhost', 0))
348
 
        self.bogus_url = 'sftp://%s:%s/' % s.getsockname()
349
 
 
350
 
        orig_vendor = bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor
351
 
        def reset():
352
 
            bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
353
 
            s.close()
354
 
        self.addCleanup(reset)
355
 
 
356
 
    def set_vendor(self, vendor):
357
 
        import bzrlib.transport.ssh
358
 
        bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = vendor
359
 
 
360
 
    def test_bad_connection_paramiko(self):
361
 
        """Test that a real connection attempt raises the right error"""
362
 
        from bzrlib.transport import ssh
363
 
        self.set_vendor(ssh.ParamikoVendor())
364
 
        self.assertRaises(errors.ConnectionError,
365
 
                          bzrlib.transport.get_transport, self.bogus_url)
366
 
 
367
 
    def test_bad_connection_ssh(self):
368
 
        """None => auto-detect vendor"""
369
 
        self.set_vendor(None)
370
 
        # This is how I would normally test the connection code
371
 
        # it makes it very clear what we are testing.
372
 
        # However, 'ssh' will create stipple on the output, so instead
373
 
        # I'm using run_bzr_subprocess, and parsing the output
374
 
        # try:
375
 
        #     t = bzrlib.transport.get_transport(self.bogus_url)
376
 
        # except errors.ConnectionError:
377
 
        #     # Correct error
378
 
        #     pass
379
 
        # except errors.NameError, e:
380
 
        #     if 'SSHException' in str(e):
381
 
        #         raise TestSkipped('Known NameError bug in paramiko 1.6.1')
382
 
        #     raise
383
 
        # else:
384
 
        #     self.fail('Excepted ConnectionError to be raised')
385
 
 
386
 
        out, err = self.run_bzr_subprocess('log', self.bogus_url, retcode=3)
387
 
        self.assertEqual('', out)
388
 
        if "NameError: global name 'SSHException'" in err:
389
 
            # We aren't fixing this bug, because it is a bug in
390
 
            # paramiko, but we know about it, so we don't have to
391
 
            # fail the test
392
 
            raise TestSkipped('Known NameError bug with paramiko-1.6.1')
393
 
        self.assertContainsRe(err, r'bzr: ERROR: Unable to connect to SSH host'
394
 
                                   r' 127\.0\.0\.1:\d+; ')
395
 
 
396
 
 
397
 
class SFTPLatencyKnob(TestCaseWithSFTPServer):
398
 
    """Test that the testing SFTPServer's latency knob works."""
399
 
 
400
 
    def test_latency_knob_slows_transport(self):
401
 
        # change the latency knob to 500ms. We take about 40ms for a 
402
 
        # loopback connection ordinarily.
403
 
        start_time = time.time()
404
 
        self.get_server().add_latency = 0.5
405
 
        transport = self.get_transport()
406
 
        with_latency_knob_time = time.time() - start_time
407
 
        self.assertTrue(with_latency_knob_time > 0.4)
408
 
 
409
 
    def test_default(self):
410
 
        # This test is potentially brittle: under extremely high machine load
411
 
        # it could fail, but that is quite unlikely
412
 
        start_time = time.time()
413
 
        transport = self.get_transport()
414
 
        regular_time = time.time() - start_time
415
 
        self.assertTrue(regular_time < 0.5)
416
 
 
417
 
 
418
 
class FakeSocket(object):
419
 
    """Fake socket object used to test the SocketDelay wrapper without
420
 
    using a real socket.
421
 
    """
422
 
 
423
 
    def __init__(self):
424
 
        self._data = ""
425
 
 
426
 
    def send(self, data, flags=0):
427
 
        self._data += data
428
 
        return len(data)
429
 
 
430
 
    def sendall(self, data, flags=0):
431
 
        self._data += data
432
 
        return len(data)
433
 
 
434
 
    def recv(self, size, flags=0):
435
 
        if size < len(self._data):
436
 
            result = self._data[:size]
437
 
            self._data = self._data[size:]
438
 
            return result
439
 
        else:
440
 
            result = self._data
441
 
            self._data = ""
442
 
            return result
443
 
 
444
 
 
445
 
class TestSocketDelay(TestCase):
446
 
 
447
 
    def setUp(self):
448
 
        TestCase.setUp(self)
449
 
        if not paramiko_loaded:
450
 
            raise TestSkipped('you must have paramiko to run this test')
451
 
 
452
 
    def test_delay(self):
453
 
        from bzrlib.transport.sftp import SocketDelay
454
 
        sending = FakeSocket()
455
 
        receiving = SocketDelay(sending, 0.1, bandwidth=1000000,
456
 
                                really_sleep=False)
457
 
        # check that simulated time is charged only per round-trip:
458
 
        t1 = SocketDelay.simulated_time
459
 
        receiving.send("connect1")
460
 
        self.assertEqual(sending.recv(1024), "connect1")
461
 
        t2 = SocketDelay.simulated_time
462
 
        self.assertAlmostEqual(t2 - t1, 0.1)
463
 
        receiving.send("connect2")
464
 
        self.assertEqual(sending.recv(1024), "connect2")
465
 
        sending.send("hello")
466
 
        self.assertEqual(receiving.recv(1024), "hello")
467
 
        t3 = SocketDelay.simulated_time
468
 
        self.assertAlmostEqual(t3 - t2, 0.1)
469
 
        sending.send("hello")
470
 
        self.assertEqual(receiving.recv(1024), "hello")
471
 
        sending.send("hello")
472
 
        self.assertEqual(receiving.recv(1024), "hello")
473
 
        sending.send("hello")
474
 
        self.assertEqual(receiving.recv(1024), "hello")
475
 
        t4 = SocketDelay.simulated_time
476
 
        self.assertAlmostEqual(t4, t3)
477
 
 
478
 
    def test_bandwidth(self):
479
 
        from bzrlib.transport.sftp import SocketDelay
480
 
        sending = FakeSocket()
481
 
        receiving = SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
482
 
                                really_sleep=False)
483
 
        # check that simulated time is charged only per round-trip:
484
 
        t1 = SocketDelay.simulated_time
485
 
        receiving.send("connect")
486
 
        self.assertEqual(sending.recv(1024), "connect")
487
 
        sending.send("a" * 100)
488
 
        self.assertEqual(receiving.recv(1024), "a" * 100)
489
 
        t2 = SocketDelay.simulated_time
490
 
        self.assertAlmostEqual(t2 - t1, 100 + 7)
491
 
 
492
 
 
 
33
STUB_SERVER_KEY = """
 
34
-----BEGIN RSA PRIVATE KEY-----
 
35
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
 
36
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
 
37
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
 
38
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
 
39
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
 
40
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
 
41
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
 
42
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
 
43
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
 
44
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
 
45
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
 
46
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
 
47
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
 
48
-----END RSA PRIVATE KEY-----
 
49
"""
 
50
    
 
51
 
 
52
class SingleListener (threading.Thread):
 
53
    def __init__(self, callback):
 
54
        threading.Thread.__init__(self)
 
55
        self._callback = callback
 
56
        self._socket = socket.socket()
 
57
        self._socket.listen(1)
 
58
        self.port = self._socket.getsockname()[1]
 
59
        self.stop_event = threading.Event()
 
60
 
 
61
    def run(self):
 
62
        s, _ = self._socket.accept()
 
63
        # now close the listen socket
 
64
        self._socket.close()
 
65
        self._callback(s, self.stop_event)
 
66
    
 
67
    def stop(self):
 
68
        self.stop_event.set()
 
69
        
 
70
        
 
71
class TestCaseWithSFTPServer (TestCaseInTempDir):
 
72
    """
 
73
    Execute a test case with a stub SFTP server, serving files from the local
 
74
    filesystem over the loopback network.
 
75
    """
 
76
    
 
77
    def _run_server(self, s, stop_event):
 
78
        ssh_server = paramiko.Transport(s)
 
79
        key_file = os.path.join(self._root, 'test_rsa.key')
 
80
        file(key_file, 'w').write(STUB_SERVER_KEY)
 
81
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
82
        ssh_server.add_server_key(host_key)
 
83
        server = StubServer()
 
84
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
 
85
        event = threading.Event()
 
86
        ssh_server.start_server(event, server)
 
87
        event.wait(5.0)
 
88
        stop_event.wait(30.0)
 
89
 
 
90
    def setUp(self):
 
91
        TestCaseInTempDir.setUp(self)
 
92
        self._root = self.test_dir
 
93
 
 
94
        self._listener = SingleListener(self._run_server)
 
95
        self._listener.setDaemon(True)
 
96
        self._listener.start()        
 
97
        self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
 
98
        
 
99
    def tearDown(self):
 
100
        self._listener.stop()
 
101
        TestCaseInTempDir.tearDown(self)
 
102
 
 
103
        
 
104
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
 
105
    readonly = False
 
106
 
 
107
    def get_transport(self):
 
108
        from bzrlib.transport.sftp import SFTPTransport
 
109
        url = self._sftp_url
 
110
        return SFTPTransport(url)
 
111
 
 
112
if not paramiko_loaded:
 
113
    del SFTPTransportTest