~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testsftp.py

  • Committer: John Arbash Meinel
  • Date: 2005-11-14 17:02:35 UTC
  • mto: (1587.1.6 bound-branches)
  • mto: This revision was merged to the branch mainline in revision 1590.
  • Revision ID: john@arbash-meinel.com-20051114170235-f85afa458bae956e
Fixing up the error message for a failed bind.

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
import os
18
18
import socket
19
19
import threading
 
20
import unittest
20
21
 
21
 
import bzrlib.bzrdir as bzrdir
22
 
import bzrlib.errors as errors
23
 
from bzrlib.osutils import pathjoin, lexists
24
 
from bzrlib.tests import TestCaseWithTransport, TestCase, TestSkipped
25
 
import bzrlib.transport
26
 
from bzrlib.workingtree import WorkingTree
 
22
from bzrlib.selftest import TestCaseInTempDir
 
23
from bzrlib.selftest.testtransport import TestTransportMixIn
27
24
 
28
25
try:
29
26
    import paramiko
 
27
    from stub_sftp import StubServer, StubSFTPServer
30
28
    paramiko_loaded = True
31
29
except ImportError:
32
30
    paramiko_loaded = False
33
31
 
34
32
 
35
 
class TestCaseWithSFTPServer(TestCaseWithTransport):
36
 
    """A test case base class that provides a sftp server on localhost."""
37
 
 
38
 
    def setUp(self):
39
 
        if not paramiko_loaded:
40
 
            raise TestSkipped('you must have paramiko to run this test')
41
 
        super(TestCaseWithSFTPServer, self).setUp()
42
 
        from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
43
 
        if getattr(self, '_get_remote_is_absolute', None) is None:
44
 
            self._get_remote_is_absolute = True
45
 
        if self._get_remote_is_absolute:
46
 
            self.transport_server = SFTPAbsoluteServer
47
 
        else:
48
 
            self.transport_server = SFTPHomeDirServer
49
 
        self.transport_readonly_server = bzrlib.transport.http.HttpServer
50
 
 
51
 
    def get_transport(self, path=None):
52
 
        """Return a transport relative to self._test_root."""
53
 
        return bzrlib.transport.get_transport(self.get_url(path))
54
 
 
55
 
 
56
 
class SFTPLockTests (TestCaseWithSFTPServer):
57
 
 
58
 
    def test_sftp_locks(self):
59
 
        from bzrlib.errors import LockError
60
 
        t = self.get_transport()
61
 
 
62
 
        l = t.lock_write('bogus')
63
 
        self.failUnlessExists('bogus.write-lock')
64
 
 
65
 
        # Don't wait for the lock, locking an already locked
66
 
        # file should raise an assert
67
 
        self.assertRaises(LockError, t.lock_write, 'bogus')
68
 
 
69
 
        l.unlock()
70
 
        self.failIf(lexists('bogus.write-lock'))
71
 
 
72
 
        open('something.write-lock', 'wb').write('fake lock\n')
73
 
        self.assertRaises(LockError, t.lock_write, 'something')
74
 
        os.remove('something.write-lock')
75
 
 
76
 
        l = t.lock_write('something')
77
 
 
78
 
        l2 = t.lock_write('bogus')
79
 
 
80
 
        l.unlock()
81
 
        l2.unlock()
82
 
 
83
 
    def test_multiple_connections(self):
84
 
        t = self.get_transport()
85
 
        self.assertTrue('sftpserver - new connection' in self.get_server().logs)
86
 
        self.get_server().logs = []
87
 
        # The second request should reuse the first connection
88
 
        # SingleListener only allows for a single connection,
89
 
        # So the next line fails unless the connection is reused
90
 
        t2 = self.get_transport()
91
 
        self.assertEquals(self.get_server().logs, [])
92
 
 
93
 
 
94
 
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
95
 
    """Test the SFTP transport with homedir based relative paths."""
96
 
 
97
 
    def test__remote_path(self):
98
 
        t = self.get_transport()
99
 
        # try what is currently used:
100
 
        # remote path = self._abspath(relpath)
101
 
        self.assertEqual(self.test_dir + '/relative', t._remote_path('relative'))
102
 
        # we dont os.path.join because windows gives us the wrong path
103
 
        root_segments = self.test_dir.split('/')
104
 
        root_parent = '/'.join(root_segments[:-1])
105
 
        # .. should be honoured
106
 
        self.assertEqual(root_parent + '/sibling', t._remote_path('../sibling'))
107
 
        # /  should be illegal ?
108
 
        ### FIXME decide and then test for all transports. RBC20051208
109
 
 
110
 
 
111
 
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
112
 
    """Test the SFTP transport with homedir based relative paths."""
113
 
 
114
 
    def setUp(self):
115
 
        self._get_remote_is_absolute = False
116
 
        super(SFTPTransportTestRelative, self).setUp()
117
 
 
118
 
    def test__remote_path_relative_root(self):
119
 
        # relative paths are preserved
120
 
        t = self.get_transport('')
121
 
        self.assertEqual('a', t._remote_path('a'))
122
 
 
123
 
 
124
 
class FakeSFTPTransport (object):
125
 
    _sftp = object()
126
 
fake = FakeSFTPTransport()
127
 
 
128
 
 
129
 
class SFTPNonServerTest(TestCase):
130
 
    def setUp(self):
131
 
        TestCase.setUp(self)
132
 
        if not paramiko_loaded:
133
 
            raise TestSkipped('you must have paramiko to run this test')
134
 
 
135
 
    def test_parse_url(self):
136
 
        from bzrlib.transport.sftp import SFTPTransport
137
 
        s = SFTPTransport('sftp://simple.example.com/home/source', clone_from=fake)
138
 
        self.assertEquals(s._host, 'simple.example.com')
139
 
        self.assertEquals(s._port, None)
140
 
        self.assertEquals(s._path, '/home/source')
141
 
        self.failUnless(s._password is None)
142
 
 
143
 
        self.assertEquals(s.base, 'sftp://simple.example.com/home/source/')
144
 
 
145
 
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/~/relative', clone_from=fake)
146
 
        self.assertEquals(s._host, 'example.com')
147
 
        self.assertEquals(s._port, 2222)
148
 
        self.assertEquals(s._username, 'robey')
149
 
        self.assertEquals(s._password, 'h@t')
150
 
        self.assertEquals(s._path, 'relative')
151
 
 
152
 
        # Base should not keep track of the password
153
 
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/~/relative/')
154
 
 
155
 
    def test_relpath(self):
156
 
        from bzrlib.transport.sftp import SFTPTransport
157
 
        from bzrlib.errors import PathNotChild
158
 
 
159
 
        s = SFTPTransport('sftp://user@host.com/abs/path', clone_from=fake)
160
 
        self.assertEquals(s.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
161
 
        # Can't test this one, because we actually get an AssertionError
162
 
        # TODO: Consider raising an exception rather than an assert
163
 
        #self.assertRaises(PathNotChild, s.relpath, 'http://user@host.com/abs/path/sub')
164
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user2@host.com/abs/path/sub')
165
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@otherhost.com/abs/path/sub')
166
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com:33/abs/path/sub')
167
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com/~/rel/path/sub')
168
 
 
169
 
        # Make sure it works when we don't supply a username
170
 
        s = SFTPTransport('sftp://host.com/abs/path', clone_from=fake)
171
 
        self.assertEquals(s.relpath('sftp://host.com/abs/path/sub'), 'sub')
172
 
 
173
 
        # Make sure it works when parts of the path will be url encoded
174
 
        # TODO: These may be incorrect, we might need to urllib.urlencode() before
175
 
        # we pass the paths into the SFTPTransport constructor
176
 
        s = SFTPTransport('sftp://host.com/dev/,path', clone_from=fake)
177
 
        self.assertEquals(s.relpath('sftp://host.com/dev/,path/sub'), 'sub')
178
 
        s = SFTPTransport('sftp://host.com/dev/%path', clone_from=fake)
179
 
        self.assertEquals(s.relpath('sftp://host.com/dev/%path/sub'), 'sub')
180
 
 
181
 
    def test_parse_invalid_url(self):
182
 
        from bzrlib.transport.sftp import SFTPTransport, TransportError
183
 
        try:
184
 
            s = SFTPTransport('sftp://lilypond.org:~janneke/public_html/bzr/gub',
185
 
                              clone_from=fake)
186
 
            self.fail('expected exception not raised')
187
 
        except TransportError, e:
188
 
            self.assertEquals(str(e), 
189
 
                    '~janneke: invalid port number')
190
 
 
191
 
 
192
 
class SFTPBranchTest(TestCaseWithSFTPServer):
193
 
    """Test some stuff when accessing a bzr Branch over sftp"""
194
 
 
195
 
    def test_lock_file(self):
196
 
        # old format branches use a special lock file on sftp.
197
 
        b = self.make_branch('', format=bzrdir.BzrDirFormat6())
198
 
        b = bzrlib.branch.Branch.open(self.get_url())
199
 
        self.failUnlessExists('.bzr/')
200
 
        self.failUnlessExists('.bzr/branch-format')
201
 
        self.failUnlessExists('.bzr/branch-lock')
202
 
 
203
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
204
 
        b.lock_write()
205
 
        self.failUnlessExists('.bzr/branch-lock.write-lock')
206
 
        b.unlock()
207
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
208
 
 
209
 
    def test_push_support(self):
210
 
        self.build_tree(['a/', 'a/foo'])
211
 
        t = bzrdir.BzrDir.create_standalone_workingtree('a')
212
 
        b = t.branch
213
 
        t.add('foo')
214
 
        t.commit('foo', rev_id='a1')
215
 
 
216
 
        b2 = bzrdir.BzrDir.create_branch_and_repo(self.get_url('/b'))
217
 
        b2.pull(b)
218
 
 
219
 
        self.assertEquals(b2.revision_history(), ['a1'])
220
 
 
221
 
        open('a/foo', 'wt').write('something new in foo\n')
222
 
        t.commit('new', rev_id='a2')
223
 
        b2.pull(b)
224
 
 
225
 
        self.assertEquals(b2.revision_history(), ['a1', 'a2'])
226
 
 
227
 
 
228
 
class SFTPFullHandshakingTest(TestCaseWithSFTPServer):
229
 
    """Verify that a full-handshake (SSH over loopback TCP) sftp connection works."""
230
 
    
231
 
    def test_connection(self):
232
 
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
233
 
        self.transport_server = SFTPFullAbsoluteServer
234
 
        self.get_transport()
 
33
STUB_SERVER_KEY = """
 
34
-----BEGIN RSA PRIVATE KEY-----
 
35
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
 
36
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
 
37
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
 
38
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
 
39
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
 
40
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
 
41
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
 
42
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
 
43
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
 
44
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
 
45
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
 
46
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
 
47
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
 
48
-----END RSA PRIVATE KEY-----
 
49
"""
 
50
    
 
51
 
 
52
class SingleListener (threading.Thread):
 
53
    def __init__(self, callback):
 
54
        threading.Thread.__init__(self)
 
55
        self._callback = callback
 
56
        self._socket = socket.socket()
 
57
        self._socket.listen(1)
 
58
        self.port = self._socket.getsockname()[1]
 
59
        self.stop_event = threading.Event()
 
60
 
 
61
    def run(self):
 
62
        s, _ = self._socket.accept()
 
63
        # now close the listen socket
 
64
        self._socket.close()
 
65
        self._callback(s, self.stop_event)
 
66
    
 
67
    def stop(self):
 
68
        self.stop_event.set()
 
69
        
 
70
        
 
71
class TestCaseWithSFTPServer (TestCaseInTempDir):
 
72
    """
 
73
    Execute a test case with a stub SFTP server, serving files from the local
 
74
    filesystem over the loopback network.
 
75
    """
 
76
    
 
77
    def _run_server(self, s, stop_event):
 
78
        ssh_server = paramiko.Transport(s)
 
79
        key_file = os.path.join(self._root, 'test_rsa.key')
 
80
        file(key_file, 'w').write(STUB_SERVER_KEY)
 
81
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
82
        ssh_server.add_server_key(host_key)
 
83
        server = StubServer()
 
84
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
 
85
        event = threading.Event()
 
86
        ssh_server.start_server(event, server)
 
87
        event.wait(5.0)
 
88
        stop_event.wait(30.0)
 
89
 
 
90
    def setUp(self):
 
91
        TestCaseInTempDir.setUp(self)
 
92
        self._root = self.test_dir
 
93
 
 
94
        self._listener = SingleListener(self._run_server)
 
95
        self._listener.setDaemon(True)
 
96
        self._listener.start()        
 
97
        self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
 
98
        
 
99
    def tearDown(self):
 
100
        self._listener.stop()
 
101
        TestCaseInTempDir.tearDown(self)
 
102
 
 
103
        
 
104
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
 
105
    readonly = False
 
106
 
 
107
    def get_transport(self):
 
108
        from bzrlib.transport.sftp import SFTPTransport
 
109
        url = self._sftp_url
 
110
        return SFTPTransport(url)
 
111
 
 
112
if not paramiko_loaded:
 
113
    del SFTPTransportTest