21
import bzrlib.bzrdir as bzrdir
22
import bzrlib.errors as errors
23
from bzrlib.osutils import pathjoin, lexists
24
from bzrlib.tests import TestCaseWithTransport, TestCase, TestSkipped
25
import bzrlib.transport
26
from bzrlib.workingtree import WorkingTree
22
from bzrlib.selftest import TestCaseInTempDir
23
from bzrlib.selftest.testtransport import TestTransportMixIn
27
from stub_sftp import StubServer, StubSFTPServer
30
28
paramiko_loaded = True
31
29
except ImportError:
32
30
paramiko_loaded = False
35
class TestCaseWithSFTPServer(TestCaseWithTransport):
36
"""A test case base class that provides a sftp server on localhost."""
39
if not paramiko_loaded:
40
raise TestSkipped('you must have paramiko to run this test')
41
super(TestCaseWithSFTPServer, self).setUp()
42
from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
43
if getattr(self, '_get_remote_is_absolute', None) is None:
44
self._get_remote_is_absolute = True
45
if self._get_remote_is_absolute:
46
self.transport_server = SFTPAbsoluteServer
48
self.transport_server = SFTPHomeDirServer
49
self.transport_readonly_server = bzrlib.transport.http.HttpServer
51
def get_transport(self, path=None):
52
"""Return a transport relative to self._test_root."""
53
return bzrlib.transport.get_transport(self.get_url(path))
56
class SFTPLockTests (TestCaseWithSFTPServer):
58
def test_sftp_locks(self):
59
from bzrlib.errors import LockError
60
t = self.get_transport()
62
l = t.lock_write('bogus')
63
self.failUnlessExists('bogus.write-lock')
65
# Don't wait for the lock, locking an already locked
66
# file should raise an assert
67
self.assertRaises(LockError, t.lock_write, 'bogus')
70
self.failIf(lexists('bogus.write-lock'))
72
open('something.write-lock', 'wb').write('fake lock\n')
73
self.assertRaises(LockError, t.lock_write, 'something')
74
os.remove('something.write-lock')
76
l = t.lock_write('something')
78
l2 = t.lock_write('bogus')
83
def test_multiple_connections(self):
84
t = self.get_transport()
85
self.assertTrue('sftpserver - new connection' in self.get_server().logs)
86
self.get_server().logs = []
87
# The second request should reuse the first connection
88
# SingleListener only allows for a single connection,
89
# So the next line fails unless the connection is reused
90
t2 = self.get_transport()
91
self.assertEquals(self.get_server().logs, [])
94
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
95
"""Test the SFTP transport with homedir based relative paths."""
97
def test__remote_path(self):
98
t = self.get_transport()
99
# try what is currently used:
100
# remote path = self._abspath(relpath)
101
self.assertEqual(self.test_dir + '/relative', t._remote_path('relative'))
102
# we dont os.path.join because windows gives us the wrong path
103
root_segments = self.test_dir.split('/')
104
root_parent = '/'.join(root_segments[:-1])
105
# .. should be honoured
106
self.assertEqual(root_parent + '/sibling', t._remote_path('../sibling'))
107
# / should be illegal ?
108
### FIXME decide and then test for all transports. RBC20051208
111
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
112
"""Test the SFTP transport with homedir based relative paths."""
115
self._get_remote_is_absolute = False
116
super(SFTPTransportTestRelative, self).setUp()
118
def test__remote_path_relative_root(self):
119
# relative paths are preserved
120
t = self.get_transport('')
121
self.assertEqual('a', t._remote_path('a'))
124
class FakeSFTPTransport (object):
126
fake = FakeSFTPTransport()
129
class SFTPNonServerTest(TestCase):
132
if not paramiko_loaded:
133
raise TestSkipped('you must have paramiko to run this test')
135
def test_parse_url(self):
136
from bzrlib.transport.sftp import SFTPTransport
137
s = SFTPTransport('sftp://simple.example.com/home/source', clone_from=fake)
138
self.assertEquals(s._host, 'simple.example.com')
139
self.assertEquals(s._port, None)
140
self.assertEquals(s._path, '/home/source')
141
self.failUnless(s._password is None)
143
self.assertEquals(s.base, 'sftp://simple.example.com/home/source/')
145
s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/~/relative', clone_from=fake)
146
self.assertEquals(s._host, 'example.com')
147
self.assertEquals(s._port, 2222)
148
self.assertEquals(s._username, 'robey')
149
self.assertEquals(s._password, 'h@t')
150
self.assertEquals(s._path, 'relative')
152
# Base should not keep track of the password
153
self.assertEquals(s.base, 'sftp://robey@example.com:2222/~/relative/')
155
def test_relpath(self):
156
from bzrlib.transport.sftp import SFTPTransport
157
from bzrlib.errors import PathNotChild
159
s = SFTPTransport('sftp://user@host.com/abs/path', clone_from=fake)
160
self.assertEquals(s.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
161
# Can't test this one, because we actually get an AssertionError
162
# TODO: Consider raising an exception rather than an assert
163
#self.assertRaises(PathNotChild, s.relpath, 'http://user@host.com/abs/path/sub')
164
self.assertRaises(PathNotChild, s.relpath, 'sftp://user2@host.com/abs/path/sub')
165
self.assertRaises(PathNotChild, s.relpath, 'sftp://user@otherhost.com/abs/path/sub')
166
self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com:33/abs/path/sub')
167
self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com/~/rel/path/sub')
169
# Make sure it works when we don't supply a username
170
s = SFTPTransport('sftp://host.com/abs/path', clone_from=fake)
171
self.assertEquals(s.relpath('sftp://host.com/abs/path/sub'), 'sub')
173
# Make sure it works when parts of the path will be url encoded
174
# TODO: These may be incorrect, we might need to urllib.urlencode() before
175
# we pass the paths into the SFTPTransport constructor
176
s = SFTPTransport('sftp://host.com/dev/,path', clone_from=fake)
177
self.assertEquals(s.relpath('sftp://host.com/dev/,path/sub'), 'sub')
178
s = SFTPTransport('sftp://host.com/dev/%path', clone_from=fake)
179
self.assertEquals(s.relpath('sftp://host.com/dev/%path/sub'), 'sub')
181
def test_parse_invalid_url(self):
182
from bzrlib.transport.sftp import SFTPTransport, TransportError
184
s = SFTPTransport('sftp://lilypond.org:~janneke/public_html/bzr/gub',
186
self.fail('expected exception not raised')
187
except TransportError, e:
188
self.assertEquals(str(e),
189
'~janneke: invalid port number')
192
class SFTPBranchTest(TestCaseWithSFTPServer):
193
"""Test some stuff when accessing a bzr Branch over sftp"""
195
def test_lock_file(self):
196
# old format branches use a special lock file on sftp.
197
b = self.make_branch('', format=bzrdir.BzrDirFormat6())
198
b = bzrlib.branch.Branch.open(self.get_url())
199
self.failUnlessExists('.bzr/')
200
self.failUnlessExists('.bzr/branch-format')
201
self.failUnlessExists('.bzr/branch-lock')
203
self.failIf(lexists('.bzr/branch-lock.write-lock'))
205
self.failUnlessExists('.bzr/branch-lock.write-lock')
207
self.failIf(lexists('.bzr/branch-lock.write-lock'))
209
def test_push_support(self):
210
self.build_tree(['a/', 'a/foo'])
211
t = bzrdir.BzrDir.create_standalone_workingtree('a')
214
t.commit('foo', rev_id='a1')
216
b2 = bzrdir.BzrDir.create_branch_and_repo(self.get_url('/b'))
219
self.assertEquals(b2.revision_history(), ['a1'])
221
open('a/foo', 'wt').write('something new in foo\n')
222
t.commit('new', rev_id='a2')
225
self.assertEquals(b2.revision_history(), ['a1', 'a2'])
228
class SFTPFullHandshakingTest(TestCaseWithSFTPServer):
229
"""Verify that a full-handshake (SSH over loopback TCP) sftp connection works."""
231
def test_connection(self):
232
from bzrlib.transport.sftp import SFTPFullAbsoluteServer
233
self.transport_server = SFTPFullAbsoluteServer
34
-----BEGIN RSA PRIVATE KEY-----
35
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
36
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
37
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
38
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
39
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
40
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
41
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
42
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
43
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
44
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
45
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
46
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
47
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
48
-----END RSA PRIVATE KEY-----
52
class SingleListener (threading.Thread):
53
def __init__(self, callback):
54
threading.Thread.__init__(self)
55
self._callback = callback
56
self._socket = socket.socket()
57
self._socket.listen(1)
58
self.port = self._socket.getsockname()[1]
59
self.stop_event = threading.Event()
62
s, _ = self._socket.accept()
63
# now close the listen socket
65
self._callback(s, self.stop_event)
71
class TestCaseWithSFTPServer (TestCaseInTempDir):
73
Execute a test case with a stub SFTP server, serving files from the local
74
filesystem over the loopback network.
77
def _run_server(self, s, stop_event):
78
ssh_server = paramiko.Transport(s)
79
key_file = os.path.join(self._root, 'test_rsa.key')
80
file(key_file, 'w').write(STUB_SERVER_KEY)
81
host_key = paramiko.RSAKey.from_private_key_file(key_file)
82
ssh_server.add_server_key(host_key)
84
ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
85
event = threading.Event()
86
ssh_server.start_server(event, server)
91
TestCaseInTempDir.setUp(self)
92
self._root = self.test_dir
94
self._listener = SingleListener(self._run_server)
95
self._listener.setDaemon(True)
96
self._listener.start()
97
self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
100
self._listener.stop()
101
TestCaseInTempDir.tearDown(self)
104
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
107
def get_transport(self):
108
from bzrlib.transport.sftp import SFTPTransport
110
return SFTPTransport(url)
112
if not paramiko_loaded:
113
del SFTPTransportTest