1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22
from bzrlib.selftest import TestCaseInTempDir
23
from bzrlib.selftest.testtransport import TestTransportMixIn
27
from stub_sftp import StubServer, StubSFTPServer
28
paramiko_loaded = True
30
paramiko_loaded = False
34
-----BEGIN RSA PRIVATE KEY-----
35
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
36
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
37
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
38
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
39
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
40
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
41
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
42
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
43
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
44
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
45
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
46
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
47
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
48
-----END RSA PRIVATE KEY-----
52
class SingleListener (threading.Thread):
53
def __init__(self, callback):
54
threading.Thread.__init__(self)
55
self._callback = callback
56
self._socket = socket.socket()
57
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
58
self._socket.bind(('localhost', 0))
59
self._socket.listen(1)
60
self.port = self._socket.getsockname()[1]
61
self.stop_event = threading.Event()
64
s, _ = self._socket.accept()
65
# now close the listen socket
67
self._callback(s, self.stop_event)
73
class TestCaseWithSFTPServer (TestCaseInTempDir):
75
Execute a test case with a stub SFTP server, serving files from the local
76
filesystem over the loopback network.
79
def _run_server(self, s, stop_event):
80
ssh_server = paramiko.Transport(s)
81
key_file = os.path.join(self._root, 'test_rsa.key')
82
file(key_file, 'w').write(STUB_SERVER_KEY)
83
host_key = paramiko.RSAKey.from_private_key_file(key_file)
84
ssh_server.add_server_key(host_key)
86
ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
87
event = threading.Event()
88
ssh_server.start_server(event, server)
93
TestCaseInTempDir.setUp(self)
94
self._root = self.test_dir
96
def delayed_setup(self):
97
# some tests are just stubs that call setUp and then immediately call
98
# tearDwon. so don't create the port listener until get_transport is
99
# called and we know we're in an actual test.
100
self._listener = SingleListener(self._run_server)
101
self._listener.setDaemon(True)
102
self._listener.start()
103
self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
107
self._listener.stop()
108
except AttributeError:
110
TestCaseInTempDir.tearDown(self)
113
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
117
def get_transport(self):
121
from bzrlib.transport.sftp import SFTPTransport
123
return SFTPTransport(url)
125
def test_sftp_locks(self):
126
from bzrlib.errors import LockError
127
t = self.get_transport()
129
l = t.lock_write('bogus')
130
self.failUnlessExists('bogus.write-lock')
132
# Don't wait for the lock, locking an already locked
133
# file should raise an assert
134
self.assertRaises(LockError, t.lock_write, 'bogus')
137
self.failIf(os.path.lexists('bogus.write-lock'))
139
open('something.write-lock', 'wb').write('fake lock\n')
140
self.assertRaises(LockError, t.lock_write, 'something')
141
os.remove('something.write-lock')
143
l = t.lock_write('something')
145
l2 = t.lock_write('bogus')
151
class FakeSFTPTransport (object):
153
fake = FakeSFTPTransport()
156
class SFTPNonServerTest (unittest.TestCase):
157
def test_parse_url(self):
158
from bzrlib.transport.sftp import SFTPTransport
159
s = SFTPTransport('sftp://simple.example.com/%2fhome/source', clone_from=fake)
160
self.assertEquals(s._host, 'simple.example.com')
161
self.assertEquals(s._port, 22)
162
self.assertEquals(s._path, '/home/source')
163
self.assert_(s._password is None)
165
s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
166
self.assertEquals(s._host, 'example.com')
167
self.assertEquals(s._port, 2222)
168
self.assertEquals(s._username, 'robey')
169
self.assertEquals(s._password, 'h@t')
170
self.assertEquals(s._path, 'relative')
173
class SFTPBranchTest(TestCaseWithSFTPServer):
174
"""Test some stuff when accessing a bzr Branch over sftp"""
176
def test_lock_file(self):
177
"""Make sure that a Branch accessed over sftp tries to lock itself."""
178
from bzrlib.branch import Branch
181
b = Branch.initialize(self._sftp_url)
182
self.failUnlessExists('.bzr/')
183
self.failUnlessExists('.bzr/branch-format')
184
self.failUnlessExists('.bzr/branch-lock')
186
self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
188
self.failUnlessExists('.bzr/branch-lock.write-lock')
190
self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
193
if not paramiko_loaded:
195
del SFTPTransportTest
196
del SFTPNonServerTest