~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp.py

forgot my self.

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
import os
18
18
import socket
19
19
import threading
20
 
import unittest
21
20
 
22
 
from bzrlib.selftest import TestCaseInTempDir
23
 
from bzrlib.selftest.testtransport import TestTransportMixIn
 
21
from bzrlib.tests import TestCaseInTempDir, TestCase
 
22
from bzrlib.tests.test_transport import TestTransportMixIn
 
23
import bzrlib.errors as errors
24
24
 
25
25
try:
26
26
    import paramiko
29
29
except ImportError:
30
30
    paramiko_loaded = False
31
31
 
 
32
# XXX: 20051124 jamesh
 
33
# The tests currently pop up a password prompt when an external ssh
 
34
# is used.  This forces the use of the paramiko implementation.
 
35
if paramiko_loaded:
 
36
    import bzrlib.transport.sftp
 
37
    bzrlib.transport.sftp._ssh_vendor = 'none'
 
38
 
32
39
 
33
40
STUB_SERVER_KEY = """
34
41
-----BEGIN RSA PRIVATE KEY-----
54
61
        threading.Thread.__init__(self)
55
62
        self._callback = callback
56
63
        self._socket = socket.socket()
 
64
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
65
        self._socket.bind(('localhost', 0))
57
66
        self._socket.listen(1)
58
67
        self.port = self._socket.getsockname()[1]
59
68
        self.stop_event = threading.Event()
66
75
    
67
76
    def stop(self):
68
77
        self.stop_event.set()
 
78
        # We should consider waiting for the other thread
 
79
        # to stop, because otherwise we get spurious
 
80
        #   bzr: ERROR: Socket exception: Connection reset by peer (54)
 
81
        # because the test suite finishes before the thread has a chance
 
82
        # to close. (Especially when only running a few tests)
69
83
        
70
84
        
71
85
class TestCaseWithSFTPServer (TestCaseInTempDir):
80
94
        file(key_file, 'w').write(STUB_SERVER_KEY)
81
95
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
82
96
        ssh_server.add_server_key(host_key)
83
 
        server = StubServer()
 
97
        server = StubServer(self)
84
98
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
85
99
        event = threading.Event()
86
100
        ssh_server.start_server(event, server)
90
104
    def setUp(self):
91
105
        TestCaseInTempDir.setUp(self)
92
106
        self._root = self.test_dir
 
107
        self._is_setup = False
93
108
 
 
109
    def delayed_setup(self):
 
110
        # some tests are just stubs that call setUp and then immediately call
 
111
        # tearDwon.  so don't create the port listener until get_transport is
 
112
        # called and we know we're in an actual test.
 
113
        if self._is_setup:
 
114
            return
94
115
        self._listener = SingleListener(self._run_server)
95
116
        self._listener.setDaemon(True)
96
117
        self._listener.start()        
97
118
        self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
 
119
        self._is_setup = True
98
120
        
99
121
    def tearDown(self):
100
 
        self._listener.stop()
 
122
        try:
 
123
            self._listener.stop()
 
124
        except AttributeError:
 
125
            pass
101
126
        TestCaseInTempDir.tearDown(self)
102
127
 
103
128
        
104
129
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
105
130
    readonly = False
106
131
 
 
132
    def setUp(self):
 
133
        TestCaseWithSFTPServer.setUp(self)
 
134
        self.sftplogs = []
 
135
 
 
136
    def log(self, *args):
 
137
        """Override the default log to grab sftp server messages"""
 
138
        TestCaseWithSFTPServer.log(self, *args)
 
139
        if args and args[0].startswith('sftpserver'):
 
140
            self.sftplogs.append(args[0])
 
141
 
107
142
    def get_transport(self):
 
143
        self.delayed_setup()
108
144
        from bzrlib.transport.sftp import SFTPTransport
109
145
        url = self._sftp_url
110
146
        return SFTPTransport(url)
111
147
 
 
148
    def test_sftp_locks(self):
 
149
        from bzrlib.errors import LockError
 
150
        t = self.get_transport()
 
151
 
 
152
        l = t.lock_write('bogus')
 
153
        self.failUnlessExists('bogus.write-lock')
 
154
 
 
155
        # Don't wait for the lock, locking an already locked
 
156
        # file should raise an assert
 
157
        self.assertRaises(LockError, t.lock_write, 'bogus')
 
158
 
 
159
        l.unlock()
 
160
        self.failIf(os.path.lexists('bogus.write-lock'))
 
161
 
 
162
        open('something.write-lock', 'wb').write('fake lock\n')
 
163
        self.assertRaises(LockError, t.lock_write, 'something')
 
164
        os.remove('something.write-lock')
 
165
 
 
166
        l = t.lock_write('something')
 
167
 
 
168
        l2 = t.lock_write('bogus')
 
169
 
 
170
        l.unlock()
 
171
        l2.unlock()
 
172
 
 
173
    def test_multiple_connections(self):
 
174
        t = self.get_transport()
 
175
        self.assertEquals(self.sftplogs, 
 
176
                ['sftpserver - authorizing: foo'
 
177
               , 'sftpserver - channel request: session, 1'])
 
178
        self.sftplogs = []
 
179
        # The second request should reuse the first connection
 
180
        # SingleListener only allows for a single connection,
 
181
        # So the next line fails unless the connection is reused
 
182
        t2 = self.get_transport()
 
183
        self.assertEquals(self.sftplogs, [])
 
184
 
 
185
 
 
186
class FakeSFTPTransport (object):
 
187
    _sftp = object()
 
188
fake = FakeSFTPTransport()
 
189
 
 
190
 
 
191
class SFTPNonServerTest(TestCase):
 
192
    def test_parse_url(self):
 
193
        from bzrlib.transport.sftp import SFTPTransport
 
194
        s = SFTPTransport('sftp://simple.example.com/%2fhome/source', clone_from=fake)
 
195
        self.assertEquals(s._host, 'simple.example.com')
 
196
        self.assertEquals(s._port, None)
 
197
        self.assertEquals(s._path, '/home/source')
 
198
        self.failUnless(s._password is None)
 
199
 
 
200
        self.assertEquals(s.base, 'sftp://simple.example.com/%2Fhome/source')
 
201
        
 
202
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
 
203
        self.assertEquals(s._host, 'example.com')
 
204
        self.assertEquals(s._port, 2222)
 
205
        self.assertEquals(s._username, 'robey')
 
206
        self.assertEquals(s._password, 'h@t')
 
207
        self.assertEquals(s._path, 'relative')
 
208
 
 
209
        # Base should not keep track of the password
 
210
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/relative')
 
211
 
 
212
        # Double slash should be accepted instead of using %2F
 
213
        s = SFTPTransport('sftp://user@example.com:22//absolute/path', clone_from=fake)
 
214
        self.assertEquals(s._host, 'example.com')
 
215
        self.assertEquals(s._port, 22)
 
216
        self.assertEquals(s._username, 'user')
 
217
        self.assertEquals(s._password, None)
 
218
        self.assertEquals(s._path, '/absolute/path')
 
219
 
 
220
        # Also, don't show the port if it is the default 22
 
221
        self.assertEquals(s.base, 'sftp://user@example.com:22/%2Fabsolute/path')
 
222
 
 
223
    def test_relpath(self):
 
224
        from bzrlib.transport.sftp import SFTPTransport
 
225
        from bzrlib.errors import PathNotChild
 
226
 
 
227
        s = SFTPTransport('sftp://user@host.com//abs/path', clone_from=fake)
 
228
        self.assertEquals(s.relpath('sftp://user@host.com//abs/path/sub'), 'sub')
 
229
        # Can't test this one, because we actually get an AssertionError
 
230
        # TODO: Consider raising an exception rather than an assert
 
231
        #self.assertRaises(PathNotChild, s.relpath, 'http://user@host.com//abs/path/sub')
 
232
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user2@host.com//abs/path/sub')
 
233
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@otherhost.com//abs/path/sub')
 
234
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com:33//abs/path/sub')
 
235
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com/abs/path/sub')
 
236
 
 
237
        # Make sure it works when we don't supply a username
 
238
        s = SFTPTransport('sftp://host.com//abs/path', clone_from=fake)
 
239
        self.assertEquals(s.relpath('sftp://host.com//abs/path/sub'), 'sub')
 
240
 
 
241
        # Make sure it works when parts of the path will be url encoded
 
242
        # TODO: These may be incorrect, we might need to urllib.urlencode() before
 
243
        # we pass the paths into the SFTPTransport constructor
 
244
        s = SFTPTransport('sftp://host.com/dev/,path', clone_from=fake)
 
245
        self.assertEquals(s.relpath('sftp://host.com/dev/,path/sub'), 'sub')
 
246
        s = SFTPTransport('sftp://host.com/dev/%path', clone_from=fake)
 
247
        self.assertEquals(s.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
248
 
 
249
    def test_parse_invalid_url(self):
 
250
        from bzrlib.transport.sftp import SFTPTransport, TransportError
 
251
        try:
 
252
            s = SFTPTransport('sftp://lilypond.org:~janneke/public_html/bzr/gub',
 
253
                              clone_from=fake)
 
254
            self.fail('expected exception not raised')
 
255
        except TransportError, e:
 
256
            self.assertEquals(str(e), 
 
257
                    '~janneke: invalid port number')
 
258
 
 
259
 
 
260
class SFTPBranchTest(TestCaseWithSFTPServer):
 
261
    """Test some stuff when accessing a bzr Branch over sftp"""
 
262
 
 
263
    def test_lock_file(self):
 
264
        """Make sure that a Branch accessed over sftp tries to lock itself."""
 
265
        from bzrlib.branch import Branch
 
266
 
 
267
        self.delayed_setup()
 
268
        b = Branch.initialize(self._sftp_url)
 
269
        self.failUnlessExists('.bzr/')
 
270
        self.failUnlessExists('.bzr/branch-format')
 
271
        self.failUnlessExists('.bzr/branch-lock')
 
272
 
 
273
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
 
274
        b.lock_write()
 
275
        self.failUnlessExists('.bzr/branch-lock.write-lock')
 
276
        b.unlock()
 
277
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
 
278
 
 
279
    def test_no_working_tree(self):
 
280
        from bzrlib.branch import Branch
 
281
        self.delayed_setup()
 
282
        b = Branch.initialize(self._sftp_url)
 
283
        self.assertRaises(errors.NoWorkingTree, b.working_tree)
 
284
 
 
285
    def test_push_support(self):
 
286
        from bzrlib.branch import Branch
 
287
        self.delayed_setup()
 
288
 
 
289
        self.build_tree(['a/', 'a/foo'])
 
290
        b = Branch.initialize('a')
 
291
        t = b.working_tree()
 
292
        t.add('foo')
 
293
        t.commit('foo', rev_id='a1')
 
294
 
 
295
        os.mkdir('b')
 
296
        b2 = Branch.initialize(self._sftp_url + 'b')
 
297
        b2.pull(b)
 
298
 
 
299
        self.assertEquals(b2.revision_history(), ['a1'])
 
300
 
 
301
 
112
302
if not paramiko_loaded:
 
303
    # TODO: Skip these
113
304
    del SFTPTransportTest
 
305
    del SFTPNonServerTest
 
306
    del SFTPBranchTest