~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp_transport.py

Merge from integration.

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
import os
18
18
import socket
19
19
import threading
20
 
import unittest
21
20
 
22
 
from bzrlib.selftest import TestCaseInTempDir
23
 
from bzrlib.selftest.testtransport import TestTransportMixIn
 
21
from bzrlib.tests import TestCaseInTempDir, TestCase, TestSkipped
 
22
from bzrlib.tests.test_transport import TestTransportMixIn
 
23
import bzrlib.errors as errors
 
24
from bzrlib.osutils import pathjoin, lexists
24
25
 
25
26
try:
26
27
    import paramiko
29
30
except ImportError:
30
31
    paramiko_loaded = False
31
32
 
 
33
# XXX: 20051124 jamesh
 
34
# The tests currently pop up a password prompt when an external ssh
 
35
# is used.  This forces the use of the paramiko implementation.
 
36
if paramiko_loaded:
 
37
    import bzrlib.transport.sftp
 
38
    bzrlib.transport.sftp._ssh_vendor = 'none'
 
39
 
32
40
 
33
41
STUB_SERVER_KEY = """
34
42
-----BEGIN RSA PRIVATE KEY-----
54
62
        threading.Thread.__init__(self)
55
63
        self._callback = callback
56
64
        self._socket = socket.socket()
 
65
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
66
        self._socket.bind(('localhost', 0))
57
67
        self._socket.listen(1)
58
68
        self.port = self._socket.getsockname()[1]
59
69
        self.stop_event = threading.Event()
66
76
    
67
77
    def stop(self):
68
78
        self.stop_event.set()
 
79
        # We should consider waiting for the other thread
 
80
        # to stop, because otherwise we get spurious
 
81
        #   bzr: ERROR: Socket exception: Connection reset by peer (54)
 
82
        # because the test suite finishes before the thread has a chance
 
83
        # to close. (Especially when only running a few tests)
69
84
        
70
85
        
71
86
class TestCaseWithSFTPServer (TestCaseInTempDir):
76
91
    
77
92
    def _run_server(self, s, stop_event):
78
93
        ssh_server = paramiko.Transport(s)
79
 
        key_file = os.path.join(self._root, 'test_rsa.key')
 
94
        key_file = pathjoin(self._root, 'test_rsa.key')
80
95
        file(key_file, 'w').write(STUB_SERVER_KEY)
81
96
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
82
97
        ssh_server.add_server_key(host_key)
83
 
        server = StubServer()
 
98
        server = StubServer(self)
84
99
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
85
100
        event = threading.Event()
86
101
        ssh_server.start_server(event, server)
88
103
        stop_event.wait(30.0)
89
104
 
90
105
    def setUp(self):
 
106
        if not paramiko_loaded:
 
107
            raise TestSkipped('you must have paramiko to run this test')
91
108
        TestCaseInTempDir.setUp(self)
92
109
        self._root = self.test_dir
 
110
        self._is_setup = False
93
111
 
 
112
    def delayed_setup(self):
 
113
        # some tests are just stubs that call setUp and then immediately call
 
114
        # tearDwon.  so don't create the port listener until get_transport is
 
115
        # called and we know we're in an actual test.
 
116
        if self._is_setup:
 
117
            return
94
118
        self._listener = SingleListener(self._run_server)
95
119
        self._listener.setDaemon(True)
96
120
        self._listener.start()        
97
121
        self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
 
122
        self._is_setup = True
98
123
        
99
124
    def tearDown(self):
100
 
        self._listener.stop()
 
125
        try:
 
126
            self._listener.stop()
 
127
        except AttributeError:
 
128
            pass
101
129
        TestCaseInTempDir.tearDown(self)
102
130
 
103
131
        
104
132
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
105
133
    readonly = False
106
134
 
 
135
    def setUp(self):
 
136
        TestCaseWithSFTPServer.setUp(self)
 
137
        self.sftplogs = []
 
138
 
 
139
    def log(self, *args):
 
140
        """Override the default log to grab sftp server messages"""
 
141
        TestCaseWithSFTPServer.log(self, *args)
 
142
        if args and args[0].startswith('sftpserver'):
 
143
            self.sftplogs.append(args[0])
 
144
 
107
145
    def get_transport(self):
 
146
        self.delayed_setup()
108
147
        from bzrlib.transport.sftp import SFTPTransport
109
148
        url = self._sftp_url
110
149
        return SFTPTransport(url)
111
150
 
112
 
if not paramiko_loaded:
113
 
    del SFTPTransportTest
 
151
    def test_sftp_locks(self):
 
152
        from bzrlib.errors import LockError
 
153
        t = self.get_transport()
 
154
 
 
155
        l = t.lock_write('bogus')
 
156
        self.failUnlessExists('bogus.write-lock')
 
157
 
 
158
        # Don't wait for the lock, locking an already locked
 
159
        # file should raise an assert
 
160
        self.assertRaises(LockError, t.lock_write, 'bogus')
 
161
 
 
162
        l.unlock()
 
163
        self.failIf(lexists('bogus.write-lock'))
 
164
 
 
165
        open('something.write-lock', 'wb').write('fake lock\n')
 
166
        self.assertRaises(LockError, t.lock_write, 'something')
 
167
        os.remove('something.write-lock')
 
168
 
 
169
        l = t.lock_write('something')
 
170
 
 
171
        l2 = t.lock_write('bogus')
 
172
 
 
173
        l.unlock()
 
174
        l2.unlock()
 
175
 
 
176
    def test_multiple_connections(self):
 
177
        t = self.get_transport()
 
178
        self.assertEquals(self.sftplogs, 
 
179
                ['sftpserver - authorizing: foo'
 
180
               , 'sftpserver - channel request: session, 1'])
 
181
        self.sftplogs = []
 
182
        # The second request should reuse the first connection
 
183
        # SingleListener only allows for a single connection,
 
184
        # So the next line fails unless the connection is reused
 
185
        t2 = self.get_transport()
 
186
        self.assertEquals(self.sftplogs, [])
 
187
 
 
188
 
 
189
class FakeSFTPTransport (object):
 
190
    _sftp = object()
 
191
fake = FakeSFTPTransport()
 
192
 
 
193
 
 
194
class SFTPNonServerTest(TestCase):
 
195
    def setUp(self):
 
196
        TestCase.setUp(self)
 
197
        if not paramiko_loaded:
 
198
            raise TestSkipped('you must have paramiko to run this test')
 
199
 
 
200
    def test_parse_url(self):
 
201
        from bzrlib.transport.sftp import SFTPTransport
 
202
        s = SFTPTransport('sftp://simple.example.com/%2fhome/source', clone_from=fake)
 
203
        self.assertEquals(s._host, 'simple.example.com')
 
204
        self.assertEquals(s._port, None)
 
205
        self.assertEquals(s._path, '/home/source')
 
206
        self.failUnless(s._password is None)
 
207
 
 
208
        self.assertEquals(s.base, 'sftp://simple.example.com/%2Fhome/source')
 
209
        
 
210
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
 
211
        self.assertEquals(s._host, 'example.com')
 
212
        self.assertEquals(s._port, 2222)
 
213
        self.assertEquals(s._username, 'robey')
 
214
        self.assertEquals(s._password, 'h@t')
 
215
        self.assertEquals(s._path, 'relative')
 
216
 
 
217
        # Base should not keep track of the password
 
218
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/relative')
 
219
 
 
220
        # Double slash should be accepted instead of using %2F
 
221
        s = SFTPTransport('sftp://user@example.com:22//absolute/path', clone_from=fake)
 
222
        self.assertEquals(s._host, 'example.com')
 
223
        self.assertEquals(s._port, 22)
 
224
        self.assertEquals(s._username, 'user')
 
225
        self.assertEquals(s._password, None)
 
226
        self.assertEquals(s._path, '/absolute/path')
 
227
 
 
228
        # Also, don't show the port if it is the default 22
 
229
        self.assertEquals(s.base, 'sftp://user@example.com:22/%2Fabsolute/path')
 
230
 
 
231
    def test_relpath(self):
 
232
        from bzrlib.transport.sftp import SFTPTransport
 
233
        from bzrlib.errors import PathNotChild
 
234
 
 
235
        s = SFTPTransport('sftp://user@host.com//abs/path', clone_from=fake)
 
236
        self.assertEquals(s.relpath('sftp://user@host.com//abs/path/sub'), 'sub')
 
237
        # Can't test this one, because we actually get an AssertionError
 
238
        # TODO: Consider raising an exception rather than an assert
 
239
        #self.assertRaises(PathNotChild, s.relpath, 'http://user@host.com//abs/path/sub')
 
240
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user2@host.com//abs/path/sub')
 
241
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@otherhost.com//abs/path/sub')
 
242
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com:33//abs/path/sub')
 
243
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com/abs/path/sub')
 
244
 
 
245
        # Make sure it works when we don't supply a username
 
246
        s = SFTPTransport('sftp://host.com//abs/path', clone_from=fake)
 
247
        self.assertEquals(s.relpath('sftp://host.com//abs/path/sub'), 'sub')
 
248
 
 
249
        # Make sure it works when parts of the path will be url encoded
 
250
        # TODO: These may be incorrect, we might need to urllib.urlencode() before
 
251
        # we pass the paths into the SFTPTransport constructor
 
252
        s = SFTPTransport('sftp://host.com/dev/,path', clone_from=fake)
 
253
        self.assertEquals(s.relpath('sftp://host.com/dev/,path/sub'), 'sub')
 
254
        s = SFTPTransport('sftp://host.com/dev/%path', clone_from=fake)
 
255
        self.assertEquals(s.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
256
 
 
257
    def test_parse_invalid_url(self):
 
258
        from bzrlib.transport.sftp import SFTPTransport, TransportError
 
259
        try:
 
260
            s = SFTPTransport('sftp://lilypond.org:~janneke/public_html/bzr/gub',
 
261
                              clone_from=fake)
 
262
            self.fail('expected exception not raised')
 
263
        except TransportError, e:
 
264
            self.assertEquals(str(e), 
 
265
                    '~janneke: invalid port number')
 
266
 
 
267
 
 
268
class SFTPBranchTest(TestCaseWithSFTPServer):
 
269
    """Test some stuff when accessing a bzr Branch over sftp"""
 
270
 
 
271
    def test_lock_file(self):
 
272
        """Make sure that a Branch accessed over sftp tries to lock itself."""
 
273
        from bzrlib.branch import Branch
 
274
 
 
275
        self.delayed_setup()
 
276
        b = Branch.initialize(self._sftp_url)
 
277
        self.failUnlessExists('.bzr/')
 
278
        self.failUnlessExists('.bzr/branch-format')
 
279
        self.failUnlessExists('.bzr/branch-lock')
 
280
 
 
281
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
 
282
        b.lock_write()
 
283
        self.failUnlessExists('.bzr/branch-lock.write-lock')
 
284
        b.unlock()
 
285
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
 
286
 
 
287
    def test_no_working_tree(self):
 
288
        from bzrlib.branch import Branch
 
289
        self.delayed_setup()
 
290
        b = Branch.initialize(self._sftp_url)
 
291
        self.assertRaises(errors.NoWorkingTree, b.working_tree)
 
292
 
 
293
    def test_push_support(self):
 
294
        from bzrlib.branch import Branch
 
295
        self.delayed_setup()
 
296
 
 
297
        self.build_tree(['a/', 'a/foo'])
 
298
        b = Branch.initialize('a')
 
299
        t = b.working_tree()
 
300
        t.add('foo')
 
301
        t.commit('foo', rev_id='a1')
 
302
 
 
303
        os.mkdir('b')
 
304
        b2 = Branch.initialize(self._sftp_url + 'b')
 
305
        b2.pull(b)
 
306
 
 
307
        self.assertEquals(b2.revision_history(), ['a1'])
 
308
 
 
309
        open('a/foo', 'wt').write('something new in foo\n')
 
310
        t.commit('new', rev_id='a2')
 
311
        b2.pull(b)
 
312
 
 
313
        self.assertEquals(b2.revision_history(), ['a1', 'a2'])
 
314
 
 
315