~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp.py

  • Committer: Robert Collins
  • Date: 2005-10-03 05:54:35 UTC
  • mto: (1393.1.30)
  • mto: This revision was merged to the branch mainline in revision 1400.
  • Revision ID: robertc@robertcollins.net-20051003055434-c8ebd30d1de10247
move exporting functionality into inventory.py - uncovers bug in symlink support

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
2
 
 
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
 
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
 
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
import os
18
 
import socket
19
 
import threading
20
 
 
21
 
from bzrlib.tests import TestCaseInTempDir, TestCase
22
 
from bzrlib.tests.test_transport import TestTransportMixIn
23
 
 
24
 
try:
25
 
    import paramiko
26
 
    from stub_sftp import StubServer, StubSFTPServer
27
 
    paramiko_loaded = True
28
 
except ImportError:
29
 
    paramiko_loaded = False
30
 
 
31
 
# XXX: 20051124 jamesh
32
 
# The tests currently pop up a password prompt when an external ssh
33
 
# is used.  This forces the use of the paramiko implementation.
34
 
if paramiko_loaded:
35
 
    import bzrlib.transport.sftp
36
 
    bzrlib.transport.sftp._ssh_vendor = 'none'
37
 
 
38
 
 
39
 
STUB_SERVER_KEY = """
40
 
-----BEGIN RSA PRIVATE KEY-----
41
 
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
42
 
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
43
 
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
44
 
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
45
 
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
46
 
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
47
 
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
48
 
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
49
 
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
50
 
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
51
 
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
52
 
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
53
 
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
54
 
-----END RSA PRIVATE KEY-----
55
 
"""
56
 
    
57
 
 
58
 
class SingleListener (threading.Thread):
59
 
    def __init__(self, callback):
60
 
        threading.Thread.__init__(self)
61
 
        self._callback = callback
62
 
        self._socket = socket.socket()
63
 
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
64
 
        self._socket.bind(('localhost', 0))
65
 
        self._socket.listen(1)
66
 
        self.port = self._socket.getsockname()[1]
67
 
        self.stop_event = threading.Event()
68
 
 
69
 
    def run(self):
70
 
        s, _ = self._socket.accept()
71
 
        # now close the listen socket
72
 
        self._socket.close()
73
 
        self._callback(s, self.stop_event)
74
 
    
75
 
    def stop(self):
76
 
        self.stop_event.set()
77
 
        # We should consider waiting for the other thread
78
 
        # to stop, because otherwise we get spurious
79
 
        #   bzr: ERROR: Socket exception: Connection reset by peer (54)
80
 
        # because the test suite finishes before the thread has a chance
81
 
        # to close. (Especially when only running a few tests)
82
 
        
83
 
        
84
 
class TestCaseWithSFTPServer (TestCaseInTempDir):
85
 
    """
86
 
    Execute a test case with a stub SFTP server, serving files from the local
87
 
    filesystem over the loopback network.
88
 
    """
89
 
    
90
 
    def _run_server(self, s, stop_event):
91
 
        ssh_server = paramiko.Transport(s)
92
 
        key_file = os.path.join(self._root, 'test_rsa.key')
93
 
        file(key_file, 'w').write(STUB_SERVER_KEY)
94
 
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
95
 
        ssh_server.add_server_key(host_key)
96
 
        server = StubServer(self)
97
 
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
98
 
        event = threading.Event()
99
 
        ssh_server.start_server(event, server)
100
 
        event.wait(5.0)
101
 
        stop_event.wait(30.0)
102
 
 
103
 
    def setUp(self):
104
 
        TestCaseInTempDir.setUp(self)
105
 
        self._root = self.test_dir
106
 
        self._is_setup = False
107
 
 
108
 
    def delayed_setup(self):
109
 
        # some tests are just stubs that call setUp and then immediately call
110
 
        # tearDwon.  so don't create the port listener until get_transport is
111
 
        # called and we know we're in an actual test.
112
 
        if self._is_setup:
113
 
            return
114
 
        self._listener = SingleListener(self._run_server)
115
 
        self._listener.setDaemon(True)
116
 
        self._listener.start()        
117
 
        self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
118
 
        self._is_setup = True
119
 
        
120
 
    def tearDown(self):
121
 
        try:
122
 
            self._listener.stop()
123
 
        except AttributeError:
124
 
            pass
125
 
        TestCaseInTempDir.tearDown(self)
126
 
 
127
 
        
128
 
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
129
 
    readonly = False
130
 
 
131
 
    def setUp(self):
132
 
        TestCaseWithSFTPServer.setUp(self)
133
 
        self.sftplogs = []
134
 
 
135
 
    def log(self, *args):
136
 
        """Override the default log to grab sftp server messages"""
137
 
        TestCaseWithSFTPServer.log(self, *args)
138
 
        if args and args[0].startswith('sftpserver'):
139
 
            self.sftplogs.append(args[0])
140
 
 
141
 
    def get_transport(self):
142
 
        self.delayed_setup()
143
 
        from bzrlib.transport.sftp import SFTPTransport
144
 
        url = self._sftp_url
145
 
        return SFTPTransport(url)
146
 
 
147
 
    def test_sftp_locks(self):
148
 
        from bzrlib.errors import LockError
149
 
        t = self.get_transport()
150
 
 
151
 
        l = t.lock_write('bogus')
152
 
        self.failUnlessExists('bogus.write-lock')
153
 
 
154
 
        # Don't wait for the lock, locking an already locked
155
 
        # file should raise an assert
156
 
        self.assertRaises(LockError, t.lock_write, 'bogus')
157
 
 
158
 
        l.unlock()
159
 
        self.failIf(os.path.lexists('bogus.write-lock'))
160
 
 
161
 
        open('something.write-lock', 'wb').write('fake lock\n')
162
 
        self.assertRaises(LockError, t.lock_write, 'something')
163
 
        os.remove('something.write-lock')
164
 
 
165
 
        l = t.lock_write('something')
166
 
 
167
 
        l2 = t.lock_write('bogus')
168
 
 
169
 
        l.unlock()
170
 
        l2.unlock()
171
 
 
172
 
    def test_multiple_connections(self):
173
 
        t = self.get_transport()
174
 
        self.assertEquals(self.sftplogs, 
175
 
                ['sftpserver - authorizing: foo'
176
 
               , 'sftpserver - channel request: session, 1'])
177
 
        self.sftplogs = []
178
 
        # The second request should reuse the first connection
179
 
        # SingleListener only allows for a single connection,
180
 
        # So the next line fails unless the connection is reused
181
 
        t2 = self.get_transport()
182
 
        self.assertEquals(self.sftplogs, [])
183
 
 
184
 
 
185
 
class FakeSFTPTransport (object):
186
 
    _sftp = object()
187
 
fake = FakeSFTPTransport()
188
 
 
189
 
 
190
 
class SFTPNonServerTest(TestCase):
191
 
    def test_parse_url(self):
192
 
        from bzrlib.transport.sftp import SFTPTransport
193
 
        s = SFTPTransport('sftp://simple.example.com/%2fhome/source', clone_from=fake)
194
 
        self.assertEquals(s._host, 'simple.example.com')
195
 
        self.assertEquals(s._port, None)
196
 
        self.assertEquals(s._path, '/home/source')
197
 
        self.failUnless(s._password is None)
198
 
 
199
 
        self.assertEquals(s.base, 'sftp://simple.example.com/%2Fhome/source')
200
 
        
201
 
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
202
 
        self.assertEquals(s._host, 'example.com')
203
 
        self.assertEquals(s._port, 2222)
204
 
        self.assertEquals(s._username, 'robey')
205
 
        self.assertEquals(s._password, 'h@t')
206
 
        self.assertEquals(s._path, 'relative')
207
 
 
208
 
        # Base should not keep track of the password
209
 
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/relative')
210
 
 
211
 
        # Double slash should be accepted instead of using %2F
212
 
        s = SFTPTransport('sftp://user@example.com:22//absolute/path', clone_from=fake)
213
 
        self.assertEquals(s._host, 'example.com')
214
 
        self.assertEquals(s._port, 22)
215
 
        self.assertEquals(s._username, 'user')
216
 
        self.assertEquals(s._password, None)
217
 
        self.assertEquals(s._path, '/absolute/path')
218
 
 
219
 
        # Also, don't show the port if it is the default 22
220
 
        self.assertEquals(s.base, 'sftp://user@example.com:22/%2Fabsolute/path')
221
 
 
222
 
    def test_relpath(self):
223
 
        from bzrlib.transport.sftp import SFTPTransport
224
 
        from bzrlib.errors import NonRelativePath
225
 
 
226
 
        s = SFTPTransport('sftp://user@host.com//abs/path', clone_from=fake)
227
 
        self.assertEquals(s.relpath('sftp://user@host.com//abs/path/sub'), 'sub')
228
 
        # Can't test this one, because we actually get an AssertionError
229
 
        # TODO: Consider raising an exception rather than an assert
230
 
        #self.assertRaises(NonRelativePath, s.relpath, 'http://user@host.com//abs/path/sub')
231
 
        self.assertRaises(NonRelativePath, s.relpath, 'sftp://user2@host.com//abs/path/sub')
232
 
        self.assertRaises(NonRelativePath, s.relpath, 'sftp://user@otherhost.com//abs/path/sub')
233
 
        self.assertRaises(NonRelativePath, s.relpath, 'sftp://user@host.com:33//abs/path/sub')
234
 
        self.assertRaises(NonRelativePath, s.relpath, 'sftp://user@host.com/abs/path/sub')
235
 
 
236
 
        # Make sure it works when we don't supply a username
237
 
        s = SFTPTransport('sftp://host.com//abs/path', clone_from=fake)
238
 
        self.assertEquals(s.relpath('sftp://host.com//abs/path/sub'), 'sub')
239
 
 
240
 
        # Make sure it works when parts of the path will be url encoded
241
 
        # TODO: These may be incorrect, we might need to urllib.urlencode() before
242
 
        # we pass the paths into the SFTPTransport constructor
243
 
        s = SFTPTransport('sftp://host.com/dev/,path', clone_from=fake)
244
 
        self.assertEquals(s.relpath('sftp://host.com/dev/,path/sub'), 'sub')
245
 
        s = SFTPTransport('sftp://host.com/dev/%path', clone_from=fake)
246
 
        self.assertEquals(s.relpath('sftp://host.com/dev/%path/sub'), 'sub')
247
 
 
248
 
    def test_parse_invalid_url(self):
249
 
        from bzrlib.transport.sftp import SFTPTransport, SFTPTransportError
250
 
        try:
251
 
            s = SFTPTransport('sftp://lilypond.org:~janneke/public_html/bzr/gub',
252
 
                              clone_from=fake)
253
 
            self.fail('expected exception not raised')
254
 
        except SFTPTransportError, e:
255
 
            self.assertEquals(str(e), 
256
 
                    '~janneke: invalid port number')
257
 
 
258
 
 
259
 
class SFTPBranchTest(TestCaseWithSFTPServer):
260
 
    """Test some stuff when accessing a bzr Branch over sftp"""
261
 
 
262
 
    def test_lock_file(self):
263
 
        """Make sure that a Branch accessed over sftp tries to lock itself."""
264
 
        from bzrlib.branch import Branch
265
 
 
266
 
        self.delayed_setup()
267
 
        b = Branch.initialize(self._sftp_url)
268
 
        self.failUnlessExists('.bzr/')
269
 
        self.failUnlessExists('.bzr/branch-format')
270
 
        self.failUnlessExists('.bzr/branch-lock')
271
 
 
272
 
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
273
 
        b.lock_write()
274
 
        self.failUnlessExists('.bzr/branch-lock.write-lock')
275
 
        b.unlock()
276
 
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
277
 
 
278
 
 
279
 
if not paramiko_loaded:
280
 
    # TODO: Skip these
281
 
    del SFTPTransportTest
282
 
    del SFTPNonServerTest
283
 
    del SFTPBranchTest