~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp_transport.py

  • Committer: Aaron Bentley
  • Date: 2006-04-07 22:46:52 UTC
  • mfrom: (1645 +trunk)
  • mto: This revision was merged to the branch mainline in revision 1727.
  • Revision ID: aaron.bentley@utoronto.ca-20060407224652-4925bc3735b926f8
Merged latest bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
import socket
19
19
import threading
20
20
 
21
 
from bzrlib.tests import TestCaseInTempDir, TestCase
22
 
from bzrlib.tests.test_transport import TestTransportMixIn
 
21
import bzrlib.bzrdir as bzrdir
23
22
import bzrlib.errors as errors
 
23
from bzrlib.osutils import pathjoin, lexists
 
24
from bzrlib.tests import TestCaseWithTransport, TestCase, TestSkipped
 
25
import bzrlib.transport
 
26
from bzrlib.workingtree import WorkingTree
24
27
 
25
28
try:
26
29
    import paramiko
27
 
    from stub_sftp import StubServer, StubSFTPServer
28
30
    paramiko_loaded = True
29
31
except ImportError:
30
32
    paramiko_loaded = False
31
33
 
32
 
# XXX: 20051124 jamesh
33
 
# The tests currently pop up a password prompt when an external ssh
34
 
# is used.  This forces the use of the paramiko implementation.
35
 
if paramiko_loaded:
36
 
    import bzrlib.transport.sftp
37
 
    bzrlib.transport.sftp._ssh_vendor = 'none'
38
 
 
39
 
 
40
 
STUB_SERVER_KEY = """
41
 
-----BEGIN RSA PRIVATE KEY-----
42
 
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
43
 
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
44
 
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
45
 
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
46
 
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
47
 
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
48
 
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
49
 
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
50
 
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
51
 
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
52
 
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
53
 
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
54
 
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
55
 
-----END RSA PRIVATE KEY-----
56
 
"""
57
 
    
58
 
 
59
 
class SingleListener (threading.Thread):
60
 
    def __init__(self, callback):
61
 
        threading.Thread.__init__(self)
62
 
        self._callback = callback
63
 
        self._socket = socket.socket()
64
 
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
65
 
        self._socket.bind(('localhost', 0))
66
 
        self._socket.listen(1)
67
 
        self.port = self._socket.getsockname()[1]
68
 
        self.stop_event = threading.Event()
69
 
 
70
 
    def run(self):
71
 
        s, _ = self._socket.accept()
72
 
        # now close the listen socket
73
 
        self._socket.close()
74
 
        self._callback(s, self.stop_event)
75
 
    
76
 
    def stop(self):
77
 
        self.stop_event.set()
78
 
        # We should consider waiting for the other thread
79
 
        # to stop, because otherwise we get spurious
80
 
        #   bzr: ERROR: Socket exception: Connection reset by peer (54)
81
 
        # because the test suite finishes before the thread has a chance
82
 
        # to close. (Especially when only running a few tests)
83
 
        
84
 
        
85
 
class TestCaseWithSFTPServer (TestCaseInTempDir):
86
 
    """
87
 
    Execute a test case with a stub SFTP server, serving files from the local
88
 
    filesystem over the loopback network.
89
 
    """
90
 
    
91
 
    def _run_server(self, s, stop_event):
92
 
        ssh_server = paramiko.Transport(s)
93
 
        key_file = os.path.join(self._root, 'test_rsa.key')
94
 
        file(key_file, 'w').write(STUB_SERVER_KEY)
95
 
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
96
 
        ssh_server.add_server_key(host_key)
97
 
        server = StubServer(self)
98
 
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
99
 
        event = threading.Event()
100
 
        ssh_server.start_server(event, server)
101
 
        event.wait(5.0)
102
 
        stop_event.wait(30.0)
103
 
 
104
 
    def setUp(self):
105
 
        TestCaseInTempDir.setUp(self)
106
 
        self._root = self.test_dir
107
 
        self._is_setup = False
108
 
 
109
 
    def delayed_setup(self):
110
 
        # some tests are just stubs that call setUp and then immediately call
111
 
        # tearDwon.  so don't create the port listener until get_transport is
112
 
        # called and we know we're in an actual test.
113
 
        if self._is_setup:
114
 
            return
115
 
        self._listener = SingleListener(self._run_server)
116
 
        self._listener.setDaemon(True)
117
 
        self._listener.start()        
118
 
        self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
119
 
        self._is_setup = True
120
 
        
121
 
    def tearDown(self):
122
 
        try:
123
 
            self._listener.stop()
124
 
        except AttributeError:
125
 
            pass
126
 
        TestCaseInTempDir.tearDown(self)
127
 
 
128
 
        
129
 
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
130
 
    readonly = False
131
 
 
132
 
    def setUp(self):
133
 
        TestCaseWithSFTPServer.setUp(self)
134
 
        self.sftplogs = []
135
 
 
136
 
    def log(self, *args):
137
 
        """Override the default log to grab sftp server messages"""
138
 
        TestCaseWithSFTPServer.log(self, *args)
139
 
        if args and args[0].startswith('sftpserver'):
140
 
            self.sftplogs.append(args[0])
141
 
 
142
 
    def get_transport(self):
143
 
        self.delayed_setup()
144
 
        from bzrlib.transport.sftp import SFTPTransport
145
 
        url = self._sftp_url
146
 
        return SFTPTransport(url)
 
34
 
 
35
class TestCaseWithSFTPServer(TestCaseWithTransport):
 
36
    """A test case base class that provides a sftp server on localhost."""
 
37
 
 
38
    def setUp(self):
 
39
        if not paramiko_loaded:
 
40
            raise TestSkipped('you must have paramiko to run this test')
 
41
        super(TestCaseWithSFTPServer, self).setUp()
 
42
        from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
 
43
        if getattr(self, '_get_remote_is_absolute', None) is None:
 
44
            self._get_remote_is_absolute = True
 
45
        if self._get_remote_is_absolute:
 
46
            self.transport_server = SFTPAbsoluteServer
 
47
        else:
 
48
            self.transport_server = SFTPHomeDirServer
 
49
        self.transport_readonly_server = bzrlib.transport.http.HttpServer
 
50
 
 
51
    def get_transport(self, path=None):
 
52
        """Return a transport relative to self._test_root."""
 
53
        return bzrlib.transport.get_transport(self.get_url(path))
 
54
 
 
55
 
 
56
class SFTPLockTests (TestCaseWithSFTPServer):
147
57
 
148
58
    def test_sftp_locks(self):
149
59
        from bzrlib.errors import LockError
157
67
        self.assertRaises(LockError, t.lock_write, 'bogus')
158
68
 
159
69
        l.unlock()
160
 
        self.failIf(os.path.lexists('bogus.write-lock'))
 
70
        self.failIf(lexists('bogus.write-lock'))
161
71
 
162
72
        open('something.write-lock', 'wb').write('fake lock\n')
163
73
        self.assertRaises(LockError, t.lock_write, 'something')
172
82
 
173
83
    def test_multiple_connections(self):
174
84
        t = self.get_transport()
175
 
        self.assertEquals(self.sftplogs, 
176
 
                ['sftpserver - authorizing: foo'
177
 
               , 'sftpserver - channel request: session, 1'])
178
 
        self.sftplogs = []
 
85
        self.assertTrue('sftpserver - new connection' in self.get_server().logs)
 
86
        self.get_server().logs = []
179
87
        # The second request should reuse the first connection
180
88
        # SingleListener only allows for a single connection,
181
89
        # So the next line fails unless the connection is reused
182
90
        t2 = self.get_transport()
183
 
        self.assertEquals(self.sftplogs, [])
 
91
        self.assertEquals(self.get_server().logs, [])
 
92
 
 
93
 
 
94
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
 
95
    """Test the SFTP transport with homedir based relative paths."""
 
96
 
 
97
    def test__remote_path(self):
 
98
        t = self.get_transport()
 
99
        # try what is currently used:
 
100
        # remote path = self._abspath(relpath)
 
101
        self.assertEqual(self.test_dir + '/relative', t._remote_path('relative'))
 
102
        # we dont os.path.join because windows gives us the wrong path
 
103
        root_segments = self.test_dir.split('/')
 
104
        root_parent = '/'.join(root_segments[:-1])
 
105
        # .. should be honoured
 
106
        self.assertEqual(root_parent + '/sibling', t._remote_path('../sibling'))
 
107
        # /  should be illegal ?
 
108
        ### FIXME decide and then test for all transports. RBC20051208
 
109
 
 
110
 
 
111
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
 
112
    """Test the SFTP transport with homedir based relative paths."""
 
113
 
 
114
    def setUp(self):
 
115
        self._get_remote_is_absolute = False
 
116
        super(SFTPTransportTestRelative, self).setUp()
 
117
 
 
118
    def test__remote_path_relative_root(self):
 
119
        # relative paths are preserved
 
120
        t = self.get_transport('')
 
121
        self.assertEqual('a', t._remote_path('a'))
184
122
 
185
123
 
186
124
class FakeSFTPTransport (object):
189
127
 
190
128
 
191
129
class SFTPNonServerTest(TestCase):
 
130
    def setUp(self):
 
131
        TestCase.setUp(self)
 
132
        if not paramiko_loaded:
 
133
            raise TestSkipped('you must have paramiko to run this test')
 
134
 
192
135
    def test_parse_url(self):
193
136
        from bzrlib.transport.sftp import SFTPTransport
194
 
        s = SFTPTransport('sftp://simple.example.com/%2fhome/source', clone_from=fake)
 
137
        s = SFTPTransport('sftp://simple.example.com/home/source', clone_from=fake)
195
138
        self.assertEquals(s._host, 'simple.example.com')
196
139
        self.assertEquals(s._port, None)
197
140
        self.assertEquals(s._path, '/home/source')
198
141
        self.failUnless(s._password is None)
199
142
 
200
 
        self.assertEquals(s.base, 'sftp://simple.example.com/%2Fhome/source')
201
 
        
202
 
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
 
143
        self.assertEquals(s.base, 'sftp://simple.example.com/home/source/')
 
144
 
 
145
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/~/relative', clone_from=fake)
203
146
        self.assertEquals(s._host, 'example.com')
204
147
        self.assertEquals(s._port, 2222)
205
148
        self.assertEquals(s._username, 'robey')
207
150
        self.assertEquals(s._path, 'relative')
208
151
 
209
152
        # Base should not keep track of the password
210
 
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/relative')
211
 
 
212
 
        # Double slash should be accepted instead of using %2F
213
 
        s = SFTPTransport('sftp://user@example.com:22//absolute/path', clone_from=fake)
214
 
        self.assertEquals(s._host, 'example.com')
215
 
        self.assertEquals(s._port, 22)
216
 
        self.assertEquals(s._username, 'user')
217
 
        self.assertEquals(s._password, None)
218
 
        self.assertEquals(s._path, '/absolute/path')
219
 
 
220
 
        # Also, don't show the port if it is the default 22
221
 
        self.assertEquals(s.base, 'sftp://user@example.com:22/%2Fabsolute/path')
 
153
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/~/relative/')
222
154
 
223
155
    def test_relpath(self):
224
156
        from bzrlib.transport.sftp import SFTPTransport
225
157
        from bzrlib.errors import PathNotChild
226
158
 
227
 
        s = SFTPTransport('sftp://user@host.com//abs/path', clone_from=fake)
228
 
        self.assertEquals(s.relpath('sftp://user@host.com//abs/path/sub'), 'sub')
 
159
        s = SFTPTransport('sftp://user@host.com/abs/path', clone_from=fake)
 
160
        self.assertEquals(s.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
229
161
        # Can't test this one, because we actually get an AssertionError
230
162
        # TODO: Consider raising an exception rather than an assert
231
 
        #self.assertRaises(PathNotChild, s.relpath, 'http://user@host.com//abs/path/sub')
232
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user2@host.com//abs/path/sub')
233
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@otherhost.com//abs/path/sub')
234
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com:33//abs/path/sub')
235
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com/abs/path/sub')
 
163
        #self.assertRaises(PathNotChild, s.relpath, 'http://user@host.com/abs/path/sub')
 
164
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user2@host.com/abs/path/sub')
 
165
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@otherhost.com/abs/path/sub')
 
166
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com:33/abs/path/sub')
 
167
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com/~/rel/path/sub')
236
168
 
237
169
        # Make sure it works when we don't supply a username
238
 
        s = SFTPTransport('sftp://host.com//abs/path', clone_from=fake)
239
 
        self.assertEquals(s.relpath('sftp://host.com//abs/path/sub'), 'sub')
 
170
        s = SFTPTransport('sftp://host.com/abs/path', clone_from=fake)
 
171
        self.assertEquals(s.relpath('sftp://host.com/abs/path/sub'), 'sub')
240
172
 
241
173
        # Make sure it works when parts of the path will be url encoded
242
174
        # TODO: These may be incorrect, we might need to urllib.urlencode() before
262
194
 
263
195
    def test_lock_file(self):
264
196
        """Make sure that a Branch accessed over sftp tries to lock itself."""
265
 
        from bzrlib.branch import Branch
266
 
 
267
 
        self.delayed_setup()
268
 
        b = Branch.initialize(self._sftp_url)
 
197
        b = bzrdir.BzrDir.create_branch_and_repo(self.get_url())
269
198
        self.failUnlessExists('.bzr/')
270
199
        self.failUnlessExists('.bzr/branch-format')
271
200
        self.failUnlessExists('.bzr/branch-lock')
272
201
 
273
 
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
 
202
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
274
203
        b.lock_write()
275
204
        self.failUnlessExists('.bzr/branch-lock.write-lock')
276
205
        b.unlock()
277
 
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
278
 
 
279
 
    def test_no_working_tree(self):
280
 
        from bzrlib.branch import Branch
281
 
        self.delayed_setup()
282
 
        b = Branch.initialize(self._sftp_url)
283
 
        self.assertRaises(errors.NoWorkingTree, b.working_tree)
 
206
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
284
207
 
285
208
    def test_push_support(self):
286
 
        from bzrlib.branch import Branch
287
 
        self.delayed_setup()
288
 
 
289
209
        self.build_tree(['a/', 'a/foo'])
290
 
        b = Branch.initialize('a')
291
 
        t = b.working_tree()
 
210
        t = bzrdir.BzrDir.create_standalone_workingtree('a')
 
211
        b = t.branch
292
212
        t.add('foo')
293
213
        t.commit('foo', rev_id='a1')
294
214
 
295
 
        os.mkdir('b')
296
 
        b2 = Branch.initialize(self._sftp_url + 'b')
 
215
        b2 = bzrdir.BzrDir.create_branch_and_repo(self.get_url('/b'))
297
216
        b2.pull(b)
298
217
 
299
218
        self.assertEquals(b2.revision_history(), ['a1'])
300
219
 
301
 
 
302
 
if not paramiko_loaded:
303
 
    # TODO: Skip these
304
 
    del SFTPTransportTest
305
 
    del SFTPNonServerTest
306
 
    del SFTPBranchTest
 
220
        open('a/foo', 'wt').write('something new in foo\n')
 
221
        t.commit('new', rev_id='a2')
 
222
        b2.pull(b)
 
223
 
 
224
        self.assertEquals(b2.revision_history(), ['a1', 'a2'])
 
225
 
 
226
 
 
227
class SFTPFullHandshakingTest(TestCaseWithSFTPServer):
 
228
    """Verify that a full-handshake (SSH over loopback TCP) sftp connection works."""
 
229
    
 
230
    def test_connection(self):
 
231
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
 
232
        self.transport_server = SFTPFullAbsoluteServer
 
233
        self.get_transport()