1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21
from bzrlib.tests import TestCaseInTempDir, TestCase
22
from bzrlib.tests.test_transport import TestTransportMixIn
23
import bzrlib.errors as errors
24
from bzrlib.osutils import pathjoin, lexists
28
from stub_sftp import StubServer, StubSFTPServer
29
paramiko_loaded = True
31
paramiko_loaded = False
33
# XXX: 20051124 jamesh
34
# The tests currently pop up a password prompt when an external ssh
35
# is used. This forces the use of the paramiko implementation.
37
import bzrlib.transport.sftp
38
bzrlib.transport.sftp._ssh_vendor = 'none'
42
-----BEGIN RSA PRIVATE KEY-----
43
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
44
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
45
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
46
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
47
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
48
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
49
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
50
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
51
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
52
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
53
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
54
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
55
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
56
-----END RSA PRIVATE KEY-----
60
class SingleListener (threading.Thread):
61
def __init__(self, callback):
62
threading.Thread.__init__(self)
63
self._callback = callback
64
self._socket = socket.socket()
65
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
66
self._socket.bind(('localhost', 0))
67
self._socket.listen(1)
68
self.port = self._socket.getsockname()[1]
69
self.stop_event = threading.Event()
72
s, _ = self._socket.accept()
73
# now close the listen socket
75
self._callback(s, self.stop_event)
79
# We should consider waiting for the other thread
80
# to stop, because otherwise we get spurious
81
# bzr: ERROR: Socket exception: Connection reset by peer (54)
82
# because the test suite finishes before the thread has a chance
83
# to close. (Especially when only running a few tests)
86
class TestCaseWithSFTPServer (TestCaseInTempDir):
88
Execute a test case with a stub SFTP server, serving files from the local
89
filesystem over the loopback network.
92
def _run_server(self, s, stop_event):
93
ssh_server = paramiko.Transport(s)
94
key_file = pathjoin(self._root, 'test_rsa.key')
95
file(key_file, 'w').write(STUB_SERVER_KEY)
96
host_key = paramiko.RSAKey.from_private_key_file(key_file)
97
ssh_server.add_server_key(host_key)
98
server = StubServer(self)
99
ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
100
event = threading.Event()
101
ssh_server.start_server(event, server)
103
stop_event.wait(30.0)
106
TestCaseInTempDir.setUp(self)
107
self._root = self.test_dir
108
self._is_setup = False
110
def delayed_setup(self):
111
# some tests are just stubs that call setUp and then immediately call
112
# tearDwon. so don't create the port listener until get_transport is
113
# called and we know we're in an actual test.
116
self._listener = SingleListener(self._run_server)
117
self._listener.setDaemon(True)
118
self._listener.start()
119
self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
120
self._is_setup = True
124
self._listener.stop()
125
except AttributeError:
127
TestCaseInTempDir.tearDown(self)
130
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
134
TestCaseWithSFTPServer.setUp(self)
137
def log(self, *args):
138
"""Override the default log to grab sftp server messages"""
139
TestCaseWithSFTPServer.log(self, *args)
140
if args and args[0].startswith('sftpserver'):
141
self.sftplogs.append(args[0])
143
def get_transport(self):
145
from bzrlib.transport.sftp import SFTPTransport
147
return SFTPTransport(url)
149
def test_sftp_locks(self):
150
from bzrlib.errors import LockError
151
t = self.get_transport()
153
l = t.lock_write('bogus')
154
self.failUnlessExists('bogus.write-lock')
156
# Don't wait for the lock, locking an already locked
157
# file should raise an assert
158
self.assertRaises(LockError, t.lock_write, 'bogus')
161
self.failIf(lexists('bogus.write-lock'))
163
open('something.write-lock', 'wb').write('fake lock\n')
164
self.assertRaises(LockError, t.lock_write, 'something')
165
os.remove('something.write-lock')
167
l = t.lock_write('something')
169
l2 = t.lock_write('bogus')
174
def test_multiple_connections(self):
175
t = self.get_transport()
176
self.assertEquals(self.sftplogs,
177
['sftpserver - authorizing: foo'
178
, 'sftpserver - channel request: session, 1'])
180
# The second request should reuse the first connection
181
# SingleListener only allows for a single connection,
182
# So the next line fails unless the connection is reused
183
t2 = self.get_transport()
184
self.assertEquals(self.sftplogs, [])
187
class FakeSFTPTransport (object):
189
fake = FakeSFTPTransport()
192
class SFTPNonServerTest(TestCase):
193
def test_parse_url(self):
194
from bzrlib.transport.sftp import SFTPTransport
195
s = SFTPTransport('sftp://simple.example.com/%2fhome/source', clone_from=fake)
196
self.assertEquals(s._host, 'simple.example.com')
197
self.assertEquals(s._port, None)
198
self.assertEquals(s._path, '/home/source')
199
self.failUnless(s._password is None)
201
self.assertEquals(s.base, 'sftp://simple.example.com/%2Fhome/source')
203
s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
204
self.assertEquals(s._host, 'example.com')
205
self.assertEquals(s._port, 2222)
206
self.assertEquals(s._username, 'robey')
207
self.assertEquals(s._password, 'h@t')
208
self.assertEquals(s._path, 'relative')
210
# Base should not keep track of the password
211
self.assertEquals(s.base, 'sftp://robey@example.com:2222/relative')
213
# Double slash should be accepted instead of using %2F
214
s = SFTPTransport('sftp://user@example.com:22//absolute/path', clone_from=fake)
215
self.assertEquals(s._host, 'example.com')
216
self.assertEquals(s._port, 22)
217
self.assertEquals(s._username, 'user')
218
self.assertEquals(s._password, None)
219
self.assertEquals(s._path, '/absolute/path')
221
# Also, don't show the port if it is the default 22
222
self.assertEquals(s.base, 'sftp://user@example.com:22/%2Fabsolute/path')
224
def test_relpath(self):
225
from bzrlib.transport.sftp import SFTPTransport
226
from bzrlib.errors import NonRelativePath
228
s = SFTPTransport('sftp://user@host.com//abs/path', clone_from=fake)
229
self.assertEquals(s.relpath('sftp://user@host.com//abs/path/sub'), 'sub')
230
# Can't test this one, because we actually get an AssertionError
231
# TODO: Consider raising an exception rather than an assert
232
#self.assertRaises(NonRelativePath, s.relpath, 'http://user@host.com//abs/path/sub')
233
self.assertRaises(NonRelativePath, s.relpath, 'sftp://user2@host.com//abs/path/sub')
234
self.assertRaises(NonRelativePath, s.relpath, 'sftp://user@otherhost.com//abs/path/sub')
235
self.assertRaises(NonRelativePath, s.relpath, 'sftp://user@host.com:33//abs/path/sub')
236
self.assertRaises(NonRelativePath, s.relpath, 'sftp://user@host.com/abs/path/sub')
238
# Make sure it works when we don't supply a username
239
s = SFTPTransport('sftp://host.com//abs/path', clone_from=fake)
240
self.assertEquals(s.relpath('sftp://host.com//abs/path/sub'), 'sub')
242
# Make sure it works when parts of the path will be url encoded
243
# TODO: These may be incorrect, we might need to urllib.urlencode() before
244
# we pass the paths into the SFTPTransport constructor
245
s = SFTPTransport('sftp://host.com/dev/,path', clone_from=fake)
246
self.assertEquals(s.relpath('sftp://host.com/dev/,path/sub'), 'sub')
247
s = SFTPTransport('sftp://host.com/dev/%path', clone_from=fake)
248
self.assertEquals(s.relpath('sftp://host.com/dev/%path/sub'), 'sub')
250
def test_parse_invalid_url(self):
251
from bzrlib.transport.sftp import SFTPTransport, SFTPTransportError
253
s = SFTPTransport('sftp://lilypond.org:~janneke/public_html/bzr/gub',
255
self.fail('expected exception not raised')
256
except SFTPTransportError, e:
257
self.assertEquals(str(e),
258
'~janneke: invalid port number')
261
class SFTPBranchTest(TestCaseWithSFTPServer):
262
"""Test some stuff when accessing a bzr Branch over sftp"""
264
def test_lock_file(self):
265
"""Make sure that a Branch accessed over sftp tries to lock itself."""
266
from bzrlib.branch import Branch
269
b = Branch.initialize(self._sftp_url)
270
self.failUnlessExists('.bzr/')
271
self.failUnlessExists('.bzr/branch-format')
272
self.failUnlessExists('.bzr/branch-lock')
274
self.failIf(lexists('.bzr/branch-lock.write-lock'))
276
self.failUnlessExists('.bzr/branch-lock.write-lock')
278
self.failIf(lexists('.bzr/branch-lock.write-lock'))
280
def test_no_working_tree(self):
281
from bzrlib.branch import Branch
283
b = Branch.initialize(self._sftp_url)
284
self.assertRaises(errors.NoWorkingTree, b.working_tree)
286
def test_push_support(self):
287
from bzrlib.branch import Branch
290
self.build_tree(['a/', 'a/foo'])
291
b = Branch.initialize('a')
294
t.commit('foo', rev_id='a1')
297
b2 = Branch.initialize(self._sftp_url + 'b')
300
self.assertEquals(b2.revision_history(), ['a1'])
303
if not paramiko_loaded:
305
del SFTPTransportTest
306
del SFTPNonServerTest