1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21
from bzrlib.tests import TestCaseInTempDir, TestCase
22
from bzrlib.tests.test_transport import TestTransportMixIn
23
import bzrlib.errors as errors
27
from stub_sftp import StubServer, StubSFTPServer
28
paramiko_loaded = True
30
paramiko_loaded = False
32
# XXX: 20051124 jamesh
33
# The tests currently pop up a password prompt when an external ssh
34
# is used. This forces the use of the paramiko implementation.
36
import bzrlib.transport.sftp
37
bzrlib.transport.sftp._ssh_vendor = 'none'
41
-----BEGIN RSA PRIVATE KEY-----
42
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
43
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
44
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
45
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
46
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
47
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
48
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
49
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
50
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
51
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
52
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
53
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
54
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
55
-----END RSA PRIVATE KEY-----
59
class SingleListener (threading.Thread):
60
def __init__(self, callback):
61
threading.Thread.__init__(self)
62
self._callback = callback
63
self._socket = socket.socket()
64
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
65
self._socket.bind(('localhost', 0))
66
self._socket.listen(1)
67
self.port = self._socket.getsockname()[1]
68
self.stop_event = threading.Event()
71
s, _ = self._socket.accept()
72
# now close the listen socket
74
self._callback(s, self.stop_event)
78
# We should consider waiting for the other thread
79
# to stop, because otherwise we get spurious
80
# bzr: ERROR: Socket exception: Connection reset by peer (54)
81
# because the test suite finishes before the thread has a chance
82
# to close. (Especially when only running a few tests)
85
class TestCaseWithSFTPServer (TestCaseInTempDir):
87
Execute a test case with a stub SFTP server, serving files from the local
88
filesystem over the loopback network.
91
def _run_server(self, s, stop_event):
92
ssh_server = paramiko.Transport(s)
93
key_file = os.path.join(self._root, 'test_rsa.key')
94
file(key_file, 'w').write(STUB_SERVER_KEY)
95
host_key = paramiko.RSAKey.from_private_key_file(key_file)
96
ssh_server.add_server_key(host_key)
97
server = StubServer(self)
98
ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
99
event = threading.Event()
100
ssh_server.start_server(event, server)
102
stop_event.wait(30.0)
105
TestCaseInTempDir.setUp(self)
106
self._root = self.test_dir
107
self._is_setup = False
109
def delayed_setup(self):
110
# some tests are just stubs that call setUp and then immediately call
111
# tearDwon. so don't create the port listener until get_transport is
112
# called and we know we're in an actual test.
115
self._listener = SingleListener(self._run_server)
116
self._listener.setDaemon(True)
117
self._listener.start()
118
self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
119
self._is_setup = True
123
self._listener.stop()
124
except AttributeError:
126
TestCaseInTempDir.tearDown(self)
129
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
133
TestCaseWithSFTPServer.setUp(self)
136
def log(self, *args):
137
"""Override the default log to grab sftp server messages"""
138
TestCaseWithSFTPServer.log(self, *args)
139
if args and args[0].startswith('sftpserver'):
140
self.sftplogs.append(args[0])
142
def get_transport(self):
144
from bzrlib.transport.sftp import SFTPTransport
146
return SFTPTransport(url)
148
def test_sftp_locks(self):
149
from bzrlib.errors import LockError
150
t = self.get_transport()
152
l = t.lock_write('bogus')
153
self.failUnlessExists('bogus.write-lock')
155
# Don't wait for the lock, locking an already locked
156
# file should raise an assert
157
self.assertRaises(LockError, t.lock_write, 'bogus')
160
self.failIf(os.path.lexists('bogus.write-lock'))
162
open('something.write-lock', 'wb').write('fake lock\n')
163
self.assertRaises(LockError, t.lock_write, 'something')
164
os.remove('something.write-lock')
166
l = t.lock_write('something')
168
l2 = t.lock_write('bogus')
173
def test_multiple_connections(self):
174
t = self.get_transport()
175
self.assertEquals(self.sftplogs,
176
['sftpserver - authorizing: foo'
177
, 'sftpserver - channel request: session, 1'])
179
# The second request should reuse the first connection
180
# SingleListener only allows for a single connection,
181
# So the next line fails unless the connection is reused
182
t2 = self.get_transport()
183
self.assertEquals(self.sftplogs, [])
186
class FakeSFTPTransport (object):
188
fake = FakeSFTPTransport()
191
class SFTPNonServerTest(TestCase):
192
def test_parse_url(self):
193
from bzrlib.transport.sftp import SFTPTransport
194
s = SFTPTransport('sftp://simple.example.com/%2fhome/source', clone_from=fake)
195
self.assertEquals(s._host, 'simple.example.com')
196
self.assertEquals(s._port, None)
197
self.assertEquals(s._path, '/home/source')
198
self.failUnless(s._password is None)
200
self.assertEquals(s.base, 'sftp://simple.example.com/%2Fhome/source')
202
s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
203
self.assertEquals(s._host, 'example.com')
204
self.assertEquals(s._port, 2222)
205
self.assertEquals(s._username, 'robey')
206
self.assertEquals(s._password, 'h@t')
207
self.assertEquals(s._path, 'relative')
209
# Base should not keep track of the password
210
self.assertEquals(s.base, 'sftp://robey@example.com:2222/relative')
212
# Double slash should be accepted instead of using %2F
213
s = SFTPTransport('sftp://user@example.com:22//absolute/path', clone_from=fake)
214
self.assertEquals(s._host, 'example.com')
215
self.assertEquals(s._port, 22)
216
self.assertEquals(s._username, 'user')
217
self.assertEquals(s._password, None)
218
self.assertEquals(s._path, '/absolute/path')
220
# Also, don't show the port if it is the default 22
221
self.assertEquals(s.base, 'sftp://user@example.com:22/%2Fabsolute/path')
223
def test_relpath(self):
224
from bzrlib.transport.sftp import SFTPTransport
225
from bzrlib.errors import PathNotChild
227
s = SFTPTransport('sftp://user@host.com//abs/path', clone_from=fake)
228
self.assertEquals(s.relpath('sftp://user@host.com//abs/path/sub'), 'sub')
229
# Can't test this one, because we actually get an AssertionError
230
# TODO: Consider raising an exception rather than an assert
231
#self.assertRaises(PathNotChild, s.relpath, 'http://user@host.com//abs/path/sub')
232
self.assertRaises(PathNotChild, s.relpath, 'sftp://user2@host.com//abs/path/sub')
233
self.assertRaises(PathNotChild, s.relpath, 'sftp://user@otherhost.com//abs/path/sub')
234
self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com:33//abs/path/sub')
235
self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com/abs/path/sub')
237
# Make sure it works when we don't supply a username
238
s = SFTPTransport('sftp://host.com//abs/path', clone_from=fake)
239
self.assertEquals(s.relpath('sftp://host.com//abs/path/sub'), 'sub')
241
# Make sure it works when parts of the path will be url encoded
242
# TODO: These may be incorrect, we might need to urllib.urlencode() before
243
# we pass the paths into the SFTPTransport constructor
244
s = SFTPTransport('sftp://host.com/dev/,path', clone_from=fake)
245
self.assertEquals(s.relpath('sftp://host.com/dev/,path/sub'), 'sub')
246
s = SFTPTransport('sftp://host.com/dev/%path', clone_from=fake)
247
self.assertEquals(s.relpath('sftp://host.com/dev/%path/sub'), 'sub')
249
def test_parse_invalid_url(self):
250
from bzrlib.transport.sftp import SFTPTransport, TransportError
252
s = SFTPTransport('sftp://lilypond.org:~janneke/public_html/bzr/gub',
254
self.fail('expected exception not raised')
255
except TransportError, e:
256
self.assertEquals(str(e),
257
'~janneke: invalid port number')
260
class SFTPBranchTest(TestCaseWithSFTPServer):
261
"""Test some stuff when accessing a bzr Branch over sftp"""
263
def test_lock_file(self):
264
"""Make sure that a Branch accessed over sftp tries to lock itself."""
265
from bzrlib.branch import Branch
268
b = Branch.initialize(self._sftp_url)
269
self.failUnlessExists('.bzr/')
270
self.failUnlessExists('.bzr/branch-format')
271
self.failUnlessExists('.bzr/branch-lock')
273
self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
275
self.failUnlessExists('.bzr/branch-lock.write-lock')
277
self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
279
def test_no_working_tree(self):
280
from bzrlib.branch import Branch
282
b = Branch.initialize(self._sftp_url)
283
self.assertRaises(errors.NoWorkingTree, b.working_tree)
285
def test_push_support(self):
286
from bzrlib.branch import Branch
289
self.build_tree(['a/', 'a/foo'])
290
b = Branch.initialize('a')
293
t.commit('foo', rev_id='a1')
296
b2 = Branch.initialize(self._sftp_url + 'b')
299
self.assertEquals(b2.revision_history(), ['a1'])
302
if not paramiko_loaded:
304
del SFTPTransportTest
305
del SFTPNonServerTest