~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp.py

  • Committer: John Arbash Meinel
  • Date: 2005-12-01 19:27:48 UTC
  • mto: (1185.50.19 bzr-jam-integration)
  • mto: This revision was merged to the branch mainline in revision 1532.
  • Revision ID: john@arbash-meinel.com-20051201192748-369238cd06ecf7e8
Added osutils.mkdtemp()

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
import os
 
18
import socket
 
19
import threading
 
20
 
 
21
from bzrlib.tests import TestCaseInTempDir, TestCase
 
22
from bzrlib.tests.test_transport import TestTransportMixIn
 
23
import bzrlib.errors as errors
 
24
from bzrlib.osutils import pathjoin, lexists
 
25
 
 
26
try:
 
27
    import paramiko
 
28
    from stub_sftp import StubServer, StubSFTPServer
 
29
    paramiko_loaded = True
 
30
except ImportError:
 
31
    paramiko_loaded = False
 
32
 
 
33
# XXX: 20051124 jamesh
 
34
# The tests currently pop up a password prompt when an external ssh
 
35
# is used.  This forces the use of the paramiko implementation.
 
36
if paramiko_loaded:
 
37
    import bzrlib.transport.sftp
 
38
    bzrlib.transport.sftp._ssh_vendor = 'none'
 
39
 
 
40
 
 
41
STUB_SERVER_KEY = """
 
42
-----BEGIN RSA PRIVATE KEY-----
 
43
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
 
44
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
 
45
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
 
46
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
 
47
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
 
48
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
 
49
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
 
50
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
 
51
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
 
52
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
 
53
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
 
54
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
 
55
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
 
56
-----END RSA PRIVATE KEY-----
 
57
"""
 
58
    
 
59
 
 
60
class SingleListener (threading.Thread):
 
61
    def __init__(self, callback):
 
62
        threading.Thread.__init__(self)
 
63
        self._callback = callback
 
64
        self._socket = socket.socket()
 
65
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
66
        self._socket.bind(('localhost', 0))
 
67
        self._socket.listen(1)
 
68
        self.port = self._socket.getsockname()[1]
 
69
        self.stop_event = threading.Event()
 
70
 
 
71
    def run(self):
 
72
        s, _ = self._socket.accept()
 
73
        # now close the listen socket
 
74
        self._socket.close()
 
75
        self._callback(s, self.stop_event)
 
76
    
 
77
    def stop(self):
 
78
        self.stop_event.set()
 
79
        # We should consider waiting for the other thread
 
80
        # to stop, because otherwise we get spurious
 
81
        #   bzr: ERROR: Socket exception: Connection reset by peer (54)
 
82
        # because the test suite finishes before the thread has a chance
 
83
        # to close. (Especially when only running a few tests)
 
84
        
 
85
        
 
86
class TestCaseWithSFTPServer (TestCaseInTempDir):
 
87
    """
 
88
    Execute a test case with a stub SFTP server, serving files from the local
 
89
    filesystem over the loopback network.
 
90
    """
 
91
    
 
92
    def _run_server(self, s, stop_event):
 
93
        ssh_server = paramiko.Transport(s)
 
94
        key_file = pathjoin(self._root, 'test_rsa.key')
 
95
        file(key_file, 'w').write(STUB_SERVER_KEY)
 
96
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
97
        ssh_server.add_server_key(host_key)
 
98
        server = StubServer(self)
 
99
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
 
100
        event = threading.Event()
 
101
        ssh_server.start_server(event, server)
 
102
        event.wait(5.0)
 
103
        stop_event.wait(30.0)
 
104
 
 
105
    def setUp(self):
 
106
        TestCaseInTempDir.setUp(self)
 
107
        self._root = self.test_dir
 
108
        self._is_setup = False
 
109
 
 
110
    def delayed_setup(self):
 
111
        # some tests are just stubs that call setUp and then immediately call
 
112
        # tearDwon.  so don't create the port listener until get_transport is
 
113
        # called and we know we're in an actual test.
 
114
        if self._is_setup:
 
115
            return
 
116
        self._listener = SingleListener(self._run_server)
 
117
        self._listener.setDaemon(True)
 
118
        self._listener.start()        
 
119
        self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
 
120
        self._is_setup = True
 
121
        
 
122
    def tearDown(self):
 
123
        try:
 
124
            self._listener.stop()
 
125
        except AttributeError:
 
126
            pass
 
127
        TestCaseInTempDir.tearDown(self)
 
128
 
 
129
        
 
130
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
 
131
    readonly = False
 
132
 
 
133
    def setUp(self):
 
134
        TestCaseWithSFTPServer.setUp(self)
 
135
        self.sftplogs = []
 
136
 
 
137
    def log(self, *args):
 
138
        """Override the default log to grab sftp server messages"""
 
139
        TestCaseWithSFTPServer.log(self, *args)
 
140
        if args and args[0].startswith('sftpserver'):
 
141
            self.sftplogs.append(args[0])
 
142
 
 
143
    def get_transport(self):
 
144
        self.delayed_setup()
 
145
        from bzrlib.transport.sftp import SFTPTransport
 
146
        url = self._sftp_url
 
147
        return SFTPTransport(url)
 
148
 
 
149
    def test_sftp_locks(self):
 
150
        from bzrlib.errors import LockError
 
151
        t = self.get_transport()
 
152
 
 
153
        l = t.lock_write('bogus')
 
154
        self.failUnlessExists('bogus.write-lock')
 
155
 
 
156
        # Don't wait for the lock, locking an already locked
 
157
        # file should raise an assert
 
158
        self.assertRaises(LockError, t.lock_write, 'bogus')
 
159
 
 
160
        l.unlock()
 
161
        self.failIf(lexists('bogus.write-lock'))
 
162
 
 
163
        open('something.write-lock', 'wb').write('fake lock\n')
 
164
        self.assertRaises(LockError, t.lock_write, 'something')
 
165
        os.remove('something.write-lock')
 
166
 
 
167
        l = t.lock_write('something')
 
168
 
 
169
        l2 = t.lock_write('bogus')
 
170
 
 
171
        l.unlock()
 
172
        l2.unlock()
 
173
 
 
174
    def test_multiple_connections(self):
 
175
        t = self.get_transport()
 
176
        self.assertEquals(self.sftplogs, 
 
177
                ['sftpserver - authorizing: foo'
 
178
               , 'sftpserver - channel request: session, 1'])
 
179
        self.sftplogs = []
 
180
        # The second request should reuse the first connection
 
181
        # SingleListener only allows for a single connection,
 
182
        # So the next line fails unless the connection is reused
 
183
        t2 = self.get_transport()
 
184
        self.assertEquals(self.sftplogs, [])
 
185
 
 
186
 
 
187
class FakeSFTPTransport (object):
 
188
    _sftp = object()
 
189
fake = FakeSFTPTransport()
 
190
 
 
191
 
 
192
class SFTPNonServerTest(TestCase):
 
193
    def test_parse_url(self):
 
194
        from bzrlib.transport.sftp import SFTPTransport
 
195
        s = SFTPTransport('sftp://simple.example.com/%2fhome/source', clone_from=fake)
 
196
        self.assertEquals(s._host, 'simple.example.com')
 
197
        self.assertEquals(s._port, None)
 
198
        self.assertEquals(s._path, '/home/source')
 
199
        self.failUnless(s._password is None)
 
200
 
 
201
        self.assertEquals(s.base, 'sftp://simple.example.com/%2Fhome/source')
 
202
        
 
203
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
 
204
        self.assertEquals(s._host, 'example.com')
 
205
        self.assertEquals(s._port, 2222)
 
206
        self.assertEquals(s._username, 'robey')
 
207
        self.assertEquals(s._password, 'h@t')
 
208
        self.assertEquals(s._path, 'relative')
 
209
 
 
210
        # Base should not keep track of the password
 
211
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/relative')
 
212
 
 
213
        # Double slash should be accepted instead of using %2F
 
214
        s = SFTPTransport('sftp://user@example.com:22//absolute/path', clone_from=fake)
 
215
        self.assertEquals(s._host, 'example.com')
 
216
        self.assertEquals(s._port, 22)
 
217
        self.assertEquals(s._username, 'user')
 
218
        self.assertEquals(s._password, None)
 
219
        self.assertEquals(s._path, '/absolute/path')
 
220
 
 
221
        # Also, don't show the port if it is the default 22
 
222
        self.assertEquals(s.base, 'sftp://user@example.com:22/%2Fabsolute/path')
 
223
 
 
224
    def test_relpath(self):
 
225
        from bzrlib.transport.sftp import SFTPTransport
 
226
        from bzrlib.errors import NonRelativePath
 
227
 
 
228
        s = SFTPTransport('sftp://user@host.com//abs/path', clone_from=fake)
 
229
        self.assertEquals(s.relpath('sftp://user@host.com//abs/path/sub'), 'sub')
 
230
        # Can't test this one, because we actually get an AssertionError
 
231
        # TODO: Consider raising an exception rather than an assert
 
232
        #self.assertRaises(NonRelativePath, s.relpath, 'http://user@host.com//abs/path/sub')
 
233
        self.assertRaises(NonRelativePath, s.relpath, 'sftp://user2@host.com//abs/path/sub')
 
234
        self.assertRaises(NonRelativePath, s.relpath, 'sftp://user@otherhost.com//abs/path/sub')
 
235
        self.assertRaises(NonRelativePath, s.relpath, 'sftp://user@host.com:33//abs/path/sub')
 
236
        self.assertRaises(NonRelativePath, s.relpath, 'sftp://user@host.com/abs/path/sub')
 
237
 
 
238
        # Make sure it works when we don't supply a username
 
239
        s = SFTPTransport('sftp://host.com//abs/path', clone_from=fake)
 
240
        self.assertEquals(s.relpath('sftp://host.com//abs/path/sub'), 'sub')
 
241
 
 
242
        # Make sure it works when parts of the path will be url encoded
 
243
        # TODO: These may be incorrect, we might need to urllib.urlencode() before
 
244
        # we pass the paths into the SFTPTransport constructor
 
245
        s = SFTPTransport('sftp://host.com/dev/,path', clone_from=fake)
 
246
        self.assertEquals(s.relpath('sftp://host.com/dev/,path/sub'), 'sub')
 
247
        s = SFTPTransport('sftp://host.com/dev/%path', clone_from=fake)
 
248
        self.assertEquals(s.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
249
 
 
250
    def test_parse_invalid_url(self):
 
251
        from bzrlib.transport.sftp import SFTPTransport, SFTPTransportError
 
252
        try:
 
253
            s = SFTPTransport('sftp://lilypond.org:~janneke/public_html/bzr/gub',
 
254
                              clone_from=fake)
 
255
            self.fail('expected exception not raised')
 
256
        except SFTPTransportError, e:
 
257
            self.assertEquals(str(e), 
 
258
                    '~janneke: invalid port number')
 
259
 
 
260
 
 
261
class SFTPBranchTest(TestCaseWithSFTPServer):
 
262
    """Test some stuff when accessing a bzr Branch over sftp"""
 
263
 
 
264
    def test_lock_file(self):
 
265
        """Make sure that a Branch accessed over sftp tries to lock itself."""
 
266
        from bzrlib.branch import Branch
 
267
 
 
268
        self.delayed_setup()
 
269
        b = Branch.initialize(self._sftp_url)
 
270
        self.failUnlessExists('.bzr/')
 
271
        self.failUnlessExists('.bzr/branch-format')
 
272
        self.failUnlessExists('.bzr/branch-lock')
 
273
 
 
274
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
 
275
        b.lock_write()
 
276
        self.failUnlessExists('.bzr/branch-lock.write-lock')
 
277
        b.unlock()
 
278
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
 
279
 
 
280
    def test_no_working_tree(self):
 
281
        from bzrlib.branch import Branch
 
282
        self.delayed_setup()
 
283
        b = Branch.initialize(self._sftp_url)
 
284
        self.assertRaises(errors.NoWorkingTree, b.working_tree)
 
285
 
 
286
    def test_push_support(self):
 
287
        from bzrlib.branch import Branch
 
288
        self.delayed_setup()
 
289
 
 
290
        self.build_tree(['a/', 'a/foo'])
 
291
        b = Branch.initialize('a')
 
292
        t = b.working_tree()
 
293
        t.add('foo')
 
294
        t.commit('foo', rev_id='a1')
 
295
 
 
296
        os.mkdir('b')
 
297
        b2 = Branch.initialize(self._sftp_url + 'b')
 
298
        b2.pull(b)
 
299
 
 
300
        self.assertEquals(b2.revision_history(), ['a1'])
 
301
 
 
302
 
 
303
if not paramiko_loaded:
 
304
    # TODO: Skip these
 
305
    del SFTPTransportTest
 
306
    del SFTPNonServerTest
 
307
    del SFTPBranchTest