~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp_transport.py

Merge from integration.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
import socket
19
19
import threading
20
20
 
21
 
from bzrlib.tests import TestCaseInTempDir, TestCase
22
 
from bzrlib.tests.test_transport import TestTransportMixIn
 
21
from bzrlib.branch import Branch
23
22
import bzrlib.errors as errors
 
23
from bzrlib.osutils import pathjoin, lexists
 
24
from bzrlib.tests import TestCaseInTempDir, TestCase, TestSkipped
 
25
import bzrlib.transport
24
26
 
25
27
try:
26
28
    import paramiko
27
 
    from stub_sftp import StubServer, StubSFTPServer
28
29
    paramiko_loaded = True
29
30
except ImportError:
30
31
    paramiko_loaded = False
31
32
 
32
 
# XXX: 20051124 jamesh
33
 
# The tests currently pop up a password prompt when an external ssh
34
 
# is used.  This forces the use of the paramiko implementation.
35
 
if paramiko_loaded:
36
 
    import bzrlib.transport.sftp
37
 
    bzrlib.transport.sftp._ssh_vendor = 'none'
38
 
 
39
 
 
40
 
STUB_SERVER_KEY = """
41
 
-----BEGIN RSA PRIVATE KEY-----
42
 
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
43
 
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
44
 
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
45
 
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
46
 
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
47
 
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
48
 
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
49
 
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
50
 
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
51
 
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
52
 
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
53
 
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
54
 
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
55
 
-----END RSA PRIVATE KEY-----
56
 
"""
57
 
    
58
 
 
59
 
class SingleListener (threading.Thread):
60
 
    def __init__(self, callback):
61
 
        threading.Thread.__init__(self)
62
 
        self._callback = callback
63
 
        self._socket = socket.socket()
64
 
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
65
 
        self._socket.bind(('localhost', 0))
66
 
        self._socket.listen(1)
67
 
        self.port = self._socket.getsockname()[1]
68
 
        self.stop_event = threading.Event()
69
 
 
70
 
    def run(self):
71
 
        s, _ = self._socket.accept()
72
 
        # now close the listen socket
73
 
        self._socket.close()
74
 
        self._callback(s, self.stop_event)
75
 
    
76
 
    def stop(self):
77
 
        self.stop_event.set()
78
 
        # We should consider waiting for the other thread
79
 
        # to stop, because otherwise we get spurious
80
 
        #   bzr: ERROR: Socket exception: Connection reset by peer (54)
81
 
        # because the test suite finishes before the thread has a chance
82
 
        # to close. (Especially when only running a few tests)
83
 
        
84
 
        
85
 
class TestCaseWithSFTPServer (TestCaseInTempDir):
86
 
    """
87
 
    Execute a test case with a stub SFTP server, serving files from the local
88
 
    filesystem over the loopback network.
89
 
    """
90
 
    
91
 
    def _run_server(self, s, stop_event):
92
 
        ssh_server = paramiko.Transport(s)
93
 
        key_file = os.path.join(self._root, 'test_rsa.key')
94
 
        file(key_file, 'w').write(STUB_SERVER_KEY)
95
 
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
96
 
        ssh_server.add_server_key(host_key)
97
 
        server = StubServer(self)
98
 
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
99
 
        event = threading.Event()
100
 
        ssh_server.start_server(event, server)
101
 
        event.wait(5.0)
102
 
        stop_event.wait(30.0)
 
33
 
 
34
class TestCaseWithSFTPServer(TestCaseInTempDir):
 
35
    """A test case base class that provides a sftp server on localhost."""
103
36
 
104
37
    def setUp(self):
105
 
        TestCaseInTempDir.setUp(self)
 
38
        if not paramiko_loaded:
 
39
            raise TestSkipped('you must have paramiko to run this test')
 
40
        super(TestCaseWithSFTPServer, self).setUp()
 
41
        from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
 
42
        if getattr(self, '_get_remote_is_absolute', None) is None:
 
43
            self._get_remote_is_absolute = True
 
44
        if self._get_remote_is_absolute:
 
45
            self.server = SFTPAbsoluteServer()
 
46
        else:
 
47
            self.server = SFTPHomeDirServer()
 
48
        self.server.setUp()
 
49
        self.addCleanup(self.server.tearDown)
 
50
        self._sftp_url = self.server.get_url()
106
51
        self._root = self.test_dir
 
52
        # Set to a string in setUp to give sftp server a new homedir.
 
53
        self._override_home = None
107
54
        self._is_setup = False
108
 
 
109
 
    def delayed_setup(self):
110
 
        # some tests are just stubs that call setUp and then immediately call
111
 
        # tearDwon.  so don't create the port listener until get_transport is
112
 
        # called and we know we're in an actual test.
113
 
        if self._is_setup:
114
 
            return
115
 
        self._listener = SingleListener(self._run_server)
116
 
        self._listener.setDaemon(True)
117
 
        self._listener.start()        
118
 
        self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
119
 
        self._is_setup = True
120
 
        
121
 
    def tearDown(self):
122
 
        try:
123
 
            self._listener.stop()
124
 
        except AttributeError:
125
 
            pass
126
 
        TestCaseInTempDir.tearDown(self)
127
 
 
128
 
        
129
 
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
130
 
    readonly = False
131
 
 
132
 
    def setUp(self):
133
 
        TestCaseWithSFTPServer.setUp(self)
134
55
        self.sftplogs = []
135
56
 
136
 
    def log(self, *args):
137
 
        """Override the default log to grab sftp server messages"""
138
 
        TestCaseWithSFTPServer.log(self, *args)
139
 
        if args and args[0].startswith('sftpserver'):
140
 
            self.sftplogs.append(args[0])
141
 
 
142
 
    def get_transport(self):
143
 
        self.delayed_setup()
144
 
        from bzrlib.transport.sftp import SFTPTransport
145
 
        url = self._sftp_url
146
 
        return SFTPTransport(url)
 
57
    def get_remote_url(self, relpath_to_test_root):
 
58
        # FIXME use urljoin ?
 
59
        return self._sftp_url + '/' + relpath_to_test_root
 
60
 
 
61
    def get_transport(self, path=None):
 
62
        """Return a transport relative to self._test_root."""
 
63
        from bzrlib.transport import get_transport
 
64
        transport = get_transport(self._sftp_url)
 
65
        if path is None:
 
66
            return transport
 
67
        else:
 
68
            return transport.clone(path)
 
69
 
 
70
 
 
71
class SFTPLockTests (TestCaseWithSFTPServer):
147
72
 
148
73
    def test_sftp_locks(self):
149
74
        from bzrlib.errors import LockError
157
82
        self.assertRaises(LockError, t.lock_write, 'bogus')
158
83
 
159
84
        l.unlock()
160
 
        self.failIf(os.path.lexists('bogus.write-lock'))
 
85
        self.failIf(lexists('bogus.write-lock'))
161
86
 
162
87
        open('something.write-lock', 'wb').write('fake lock\n')
163
88
        self.assertRaises(LockError, t.lock_write, 'something')
172
97
 
173
98
    def test_multiple_connections(self):
174
99
        t = self.get_transport()
175
 
        self.assertEquals(self.sftplogs, 
 
100
        self.assertEquals(self.server.logs, 
176
101
                ['sftpserver - authorizing: foo'
177
102
               , 'sftpserver - channel request: session, 1'])
178
 
        self.sftplogs = []
 
103
        self.server.logs = []
179
104
        # The second request should reuse the first connection
180
105
        # SingleListener only allows for a single connection,
181
106
        # So the next line fails unless the connection is reused
182
107
        t2 = self.get_transport()
183
 
        self.assertEquals(self.sftplogs, [])
 
108
        self.assertEquals(self.server.logs, [])
 
109
 
 
110
 
 
111
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
 
112
    """Test the SFTP transport with homedir based relative paths."""
 
113
 
 
114
    def test__remote_path(self):
 
115
        t = self.get_transport()
 
116
        # try what is currently used:
 
117
        # remote path = self._abspath(relpath)
 
118
        self.assertEqual(self._root + '/relative', t._remote_path('relative'))
 
119
        # we dont os.path.join because windows gives us the wrong path
 
120
        root_segments = self._root.split('/')
 
121
        root_parent = '/'.join(root_segments[:-1])
 
122
        # .. should be honoured
 
123
        self.assertEqual(root_parent + '/sibling', t._remote_path('../sibling'))
 
124
        # /  should be illegal ?
 
125
        ### FIXME decide and then test for all transports. RBC20051208
 
126
 
 
127
 
 
128
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
 
129
    """Test the SFTP transport with homedir based relative paths."""
 
130
 
 
131
    def setUp(self):
 
132
        self._get_remote_is_absolute = False
 
133
        super(SFTPTransportTestRelative, self).setUp()
 
134
 
 
135
    def test__remote_path_relative_root(self):
 
136
        # relative paths are preserved
 
137
        t = self.get_transport('')
 
138
        self.assertEqual('a', t._remote_path('a'))
184
139
 
185
140
 
186
141
class FakeSFTPTransport (object):
189
144
 
190
145
 
191
146
class SFTPNonServerTest(TestCase):
 
147
    def setUp(self):
 
148
        TestCase.setUp(self)
 
149
        if not paramiko_loaded:
 
150
            raise TestSkipped('you must have paramiko to run this test')
 
151
 
192
152
    def test_parse_url(self):
193
153
        from bzrlib.transport.sftp import SFTPTransport
194
154
        s = SFTPTransport('sftp://simple.example.com/%2fhome/source', clone_from=fake)
197
157
        self.assertEquals(s._path, '/home/source')
198
158
        self.failUnless(s._password is None)
199
159
 
200
 
        self.assertEquals(s.base, 'sftp://simple.example.com/%2Fhome/source')
 
160
        self.assertEquals(s.base, 'sftp://simple.example.com/%2Fhome/source/')
201
161
        
202
162
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
203
163
        self.assertEquals(s._host, 'example.com')
207
167
        self.assertEquals(s._path, 'relative')
208
168
 
209
169
        # Base should not keep track of the password
210
 
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/relative')
 
170
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/relative/')
211
171
 
212
172
        # Double slash should be accepted instead of using %2F
213
 
        s = SFTPTransport('sftp://user@example.com:22//absolute/path', clone_from=fake)
 
173
        s = SFTPTransport('sftp://user@example.com:22//absolute/path/', clone_from=fake)
214
174
        self.assertEquals(s._host, 'example.com')
215
175
        self.assertEquals(s._port, 22)
216
176
        self.assertEquals(s._username, 'user')
217
177
        self.assertEquals(s._password, None)
218
 
        self.assertEquals(s._path, '/absolute/path')
 
178
        self.assertEquals(s._path, '/absolute/path/')
219
179
 
220
180
        # Also, don't show the port if it is the default 22
221
 
        self.assertEquals(s.base, 'sftp://user@example.com:22/%2Fabsolute/path')
 
181
        self.assertEquals(s.base, 'sftp://user@example.com:22/%2Fabsolute/path/')
222
182
 
223
183
    def test_relpath(self):
224
184
        from bzrlib.transport.sftp import SFTPTransport
262
222
 
263
223
    def test_lock_file(self):
264
224
        """Make sure that a Branch accessed over sftp tries to lock itself."""
265
 
        from bzrlib.branch import Branch
266
 
 
267
 
        self.delayed_setup()
268
225
        b = Branch.initialize(self._sftp_url)
269
226
        self.failUnlessExists('.bzr/')
270
227
        self.failUnlessExists('.bzr/branch-format')
271
228
        self.failUnlessExists('.bzr/branch-lock')
272
229
 
273
 
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
 
230
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
274
231
        b.lock_write()
275
232
        self.failUnlessExists('.bzr/branch-lock.write-lock')
276
233
        b.unlock()
277
 
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
 
234
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
278
235
 
279
236
    def test_no_working_tree(self):
280
 
        from bzrlib.branch import Branch
281
 
        self.delayed_setup()
282
237
        b = Branch.initialize(self._sftp_url)
283
238
        self.assertRaises(errors.NoWorkingTree, b.working_tree)
284
239
 
285
240
    def test_push_support(self):
286
 
        from bzrlib.branch import Branch
287
 
        self.delayed_setup()
288
 
 
289
241
        self.build_tree(['a/', 'a/foo'])
290
242
        b = Branch.initialize('a')
291
243
        t = b.working_tree()
293
245
        t.commit('foo', rev_id='a1')
294
246
 
295
247
        os.mkdir('b')
296
 
        b2 = Branch.initialize(self._sftp_url + 'b')
 
248
        b2 = Branch.initialize(self._sftp_url + '/b')
297
249
        b2.pull(b)
298
250
 
299
251
        self.assertEquals(b2.revision_history(), ['a1'])
300
252
 
301
 
 
302
 
if not paramiko_loaded:
303
 
    # TODO: Skip these
304
 
    del SFTPTransportTest
305
 
    del SFTPNonServerTest
306
 
    del SFTPBranchTest
 
253
        open('a/foo', 'wt').write('something new in foo\n')
 
254
        t.commit('new', rev_id='a2')
 
255
        b2.pull(b)
 
256
 
 
257
        self.assertEquals(b2.revision_history(), ['a1', 'a2'])
 
258
 
 
259