1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
2
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
4
3
# This program is free software; you can redistribute it and/or modify
5
4
# it under the terms of the GNU General Public License as published by
6
5
# the Free Software Foundation; either version 2 of the License, or
7
6
# (at your option) any later version.
9
8
# This program is distributed in the hope that it will be useful,
10
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
11
# GNU General Public License for more details.
14
13
# You should have received a copy of the GNU General Public License
15
14
# along with this program; if not, write to the Free Software
16
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22
from bzrlib.selftest import TestCaseInTempDir
23
from bzrlib.selftest.testtransport import TestTransportMixIn
27
from stub_sftp import StubServer, StubSFTPServer
26
28
paramiko_loaded = True
27
29
except ImportError:
28
30
paramiko_loaded = False
34
from bzrlib.osutils import (
39
from bzrlib.tests import (
40
TestCaseWithTransport,
44
from bzrlib.tests.http_server import HttpServer
45
from bzrlib.transport import get_transport
46
import bzrlib.transport.http
49
from bzrlib.transport.sftp import (
55
from bzrlib.workingtree import WorkingTree
58
def set_test_transport_to_sftp(testcase):
59
"""A helper to set transports on test case instances."""
60
if getattr(testcase, '_get_remote_is_absolute', None) is None:
61
testcase._get_remote_is_absolute = True
62
if testcase._get_remote_is_absolute:
63
testcase.transport_server = SFTPAbsoluteServer
65
testcase.transport_server = SFTPHomeDirServer
66
testcase.transport_readonly_server = HttpServer
69
class TestCaseWithSFTPServer(TestCaseWithTransport):
70
"""A test case base class that provides a sftp server on localhost."""
73
super(TestCaseWithSFTPServer, self).setUp()
74
if not paramiko_loaded:
75
raise TestSkipped('you must have paramiko to run this test')
76
set_test_transport_to_sftp(self)
79
class SFTPLockTests (TestCaseWithSFTPServer):
81
def test_sftp_locks(self):
82
from bzrlib.errors import LockError
83
t = self.get_transport()
85
l = t.lock_write('bogus')
86
self.failUnlessExists('bogus.write-lock')
88
# Don't wait for the lock, locking an already locked
89
# file should raise an assert
90
self.assertRaises(LockError, t.lock_write, 'bogus')
93
self.failIf(lexists('bogus.write-lock'))
95
open('something.write-lock', 'wb').write('fake lock\n')
96
self.assertRaises(LockError, t.lock_write, 'something')
97
os.remove('something.write-lock')
99
l = t.lock_write('something')
101
l2 = t.lock_write('bogus')
107
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
108
"""Test the SFTP transport with homedir based relative paths."""
110
def test__remote_path(self):
111
if sys.platform == 'darwin':
112
# This test is about sftp absolute path handling. There is already
113
# (in this test) a TODO about windows needing an absolute path
114
# without drive letter. To me, using self.test_dir is a trick to
115
# get an absolute path for comparison purposes. That fails for OSX
116
# because the sftp server doesn't resolve the links (and it doesn't
117
# have to). --vila 20070924
118
self.knownFailure('Mac OSX symlinks /tmp to /private/tmp,'
119
' testing against self.test_dir'
120
' is not appropriate')
121
t = self.get_transport()
122
# This test require unix-like absolute path
123
test_dir = self.test_dir
124
if sys.platform == 'win32':
125
# using hack suggested by John Meinel.
126
# TODO: write another mock server for this test
127
# and use absolute path without drive letter
128
test_dir = '/' + test_dir
129
# try what is currently used:
130
# remote path = self._abspath(relpath)
131
self.assertIsSameRealPath(test_dir + '/relative',
132
t._remote_path('relative'))
133
# we dont os.path.join because windows gives us the wrong path
134
root_segments = test_dir.split('/')
135
root_parent = '/'.join(root_segments[:-1])
136
# .. should be honoured
137
self.assertIsSameRealPath(root_parent + '/sibling',
138
t._remote_path('../sibling'))
139
# / should be illegal ?
140
### FIXME decide and then test for all transports. RBC20051208
143
class SFTPTransportTestRelativeRoot(TestCaseWithSFTPServer):
144
"""Test the SFTP transport with homedir based relative paths."""
147
# Only SFTPHomeDirServer is tested here
148
self._get_remote_is_absolute = False
149
super(SFTPTransportTestRelativeRoot, self).setUp()
151
def test__remote_path_relative_root(self):
152
# relative paths are preserved
153
t = self.get_transport('')
154
self.assertEqual('/~/', t._path)
155
# the remote path should be relative to home dir
156
# (i.e. not begining with a '/')
157
self.assertEqual('a', t._remote_path('a'))
160
class SFTPNonServerTest(TestCase):
163
if not paramiko_loaded:
164
raise TestSkipped('you must have paramiko to run this test')
166
def test_parse_url_with_home_dir(self):
167
s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/~/relative')
168
self.assertEquals(s._host, 'example.com')
169
self.assertEquals(s._port, 2222)
170
self.assertEquals(s._user, 'robey')
171
self.assertEquals(s._password, 'h@t')
172
self.assertEquals(s._path, '/~/relative/')
174
def test_relpath(self):
175
s = SFTPTransport('sftp://user@host.com/abs/path')
176
self.assertRaises(errors.PathNotChild, s.relpath,
177
'sftp://user@host.com/~/rel/path/sub')
179
def test_get_paramiko_vendor(self):
180
"""Test that if no 'ssh' is available we get builtin paramiko"""
181
from bzrlib.transport import ssh
182
# set '.' as the only location in the path, forcing no 'ssh' to exist
183
orig_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
184
orig_path = set_or_unset_env('PATH', '.')
186
# No vendor defined yet, query for one
187
ssh._ssh_vendor_manager.clear_cache()
188
vendor = ssh._get_ssh_vendor()
189
self.assertIsInstance(vendor, ssh.ParamikoVendor)
191
set_or_unset_env('PATH', orig_path)
192
ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
194
def test_abspath_root_sibling_server(self):
195
from bzrlib.transport.sftp import SFTPSiblingAbsoluteServer
196
server = SFTPSiblingAbsoluteServer()
199
transport = get_transport(server.get_url())
200
self.assertFalse(transport.abspath('/').endswith('/~/'))
201
self.assertTrue(transport.abspath('/').endswith('/'))
207
class SFTPBranchTest(TestCaseWithSFTPServer):
208
"""Test some stuff when accessing a bzr Branch over sftp"""
210
def test_lock_file(self):
211
# old format branches use a special lock file on sftp.
212
b = self.make_branch('', format=bzrdir.BzrDirFormat6())
213
b = bzrlib.branch.Branch.open(self.get_url())
214
self.failUnlessExists('.bzr/')
215
self.failUnlessExists('.bzr/branch-format')
216
self.failUnlessExists('.bzr/branch-lock')
218
self.failIf(lexists('.bzr/branch-lock.write-lock'))
220
self.failUnlessExists('.bzr/branch-lock.write-lock')
222
self.failIf(lexists('.bzr/branch-lock.write-lock'))
224
def test_push_support(self):
225
self.build_tree(['a/', 'a/foo'])
226
t = bzrdir.BzrDir.create_standalone_workingtree('a')
229
t.commit('foo', rev_id='a1')
231
b2 = bzrdir.BzrDir.create_branch_and_repo(self.get_url('/b'))
234
self.assertEquals(b2.revision_history(), ['a1'])
236
open('a/foo', 'wt').write('something new in foo\n')
237
t.commit('new', rev_id='a2')
240
self.assertEquals(b2.revision_history(), ['a1', 'a2'])
243
class SSHVendorConnection(TestCaseWithSFTPServer):
244
"""Test that the ssh vendors can all connect.
246
Verify that a full-handshake (SSH over loopback TCP) sftp connection works.
248
We have 3 sftp implementations in the test suite:
249
'loopback': Doesn't use ssh, just uses a local socket. Most tests are
250
done this way to save the handshaking time, so it is not
252
'none': This uses paramiko's built-in ssh client and server, and layers
254
None: If 'ssh' exists on the machine, then it will be spawned as a
259
super(SSHVendorConnection, self).setUp()
260
from bzrlib.transport.sftp import SFTPFullAbsoluteServer
263
"""Just a wrapper so that when created, it will set _vendor"""
264
# SFTPFullAbsoluteServer can handle any vendor,
265
# it just needs to be set between the time it is instantiated
266
# and the time .setUp() is called
267
server = SFTPFullAbsoluteServer()
268
server._vendor = self._test_vendor
270
self._test_vendor = 'loopback'
271
self.vfs_transport_server = create_server
272
f = open('a_file', 'wb')
278
def set_vendor(self, vendor):
279
self._test_vendor = vendor
281
def test_connection_paramiko(self):
282
from bzrlib.transport import ssh
283
self.set_vendor(ssh.ParamikoVendor())
284
t = self.get_transport()
285
self.assertEqual('foobar\n', t.get('a_file').read())
287
def test_connection_vendor(self):
288
raise TestSkipped("We don't test spawning real ssh,"
289
" because it prompts for a password."
290
" Enable this test if we figure out"
291
" how to prevent this.")
292
self.set_vendor(None)
293
t = self.get_transport()
294
self.assertEqual('foobar\n', t.get('a_file').read())
297
class SSHVendorBadConnection(TestCaseWithTransport):
298
"""Test that the ssh vendors handle bad connection properly
300
We don't subclass TestCaseWithSFTPServer, because we don't actually
301
need an SFTP connection.
305
if not paramiko_loaded:
306
raise TestSkipped('you must have paramiko to run this test')
307
super(SSHVendorBadConnection, self).setUp()
308
import bzrlib.transport.ssh
310
# open a random port, so we know nobody else is using it
311
# but don't actually listen on the port.
313
s.bind(('localhost', 0))
314
self.bogus_url = 'sftp://%s:%s/' % s.getsockname()
316
orig_vendor = bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor
318
bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
320
self.addCleanup(reset)
322
def set_vendor(self, vendor):
323
import bzrlib.transport.ssh
324
bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = vendor
326
def test_bad_connection_paramiko(self):
327
"""Test that a real connection attempt raises the right error"""
328
from bzrlib.transport import ssh
329
self.set_vendor(ssh.ParamikoVendor())
330
t = bzrlib.transport.get_transport(self.bogus_url)
331
self.assertRaises(errors.ConnectionError, t.get, 'foobar')
333
def test_bad_connection_ssh(self):
334
"""None => auto-detect vendor"""
335
self.set_vendor(None)
336
# This is how I would normally test the connection code
337
# it makes it very clear what we are testing.
338
# However, 'ssh' will create stipple on the output, so instead
339
# I'm using run_bzr_subprocess, and parsing the output
341
# t = bzrlib.transport.get_transport(self.bogus_url)
342
# except errors.ConnectionError:
345
# except errors.NameError, e:
346
# if 'SSHException' in str(e):
347
# raise TestSkipped('Known NameError bug in paramiko 1.6.1')
350
# self.fail('Excepted ConnectionError to be raised')
352
out, err = self.run_bzr_subprocess(['log', self.bogus_url], retcode=3)
353
self.assertEqual('', out)
354
if "NameError: global name 'SSHException'" in err:
355
# We aren't fixing this bug, because it is a bug in
356
# paramiko, but we know about it, so we don't have to
358
raise TestSkipped('Known NameError bug with paramiko-1.6.1')
359
self.assertContainsRe(err, r'bzr: ERROR: Unable to connect to SSH host'
360
r' 127\.0\.0\.1:\d+; ')
363
class SFTPLatencyKnob(TestCaseWithSFTPServer):
364
"""Test that the testing SFTPServer's latency knob works."""
366
def test_latency_knob_slows_transport(self):
367
# change the latency knob to 500ms. We take about 40ms for a
368
# loopback connection ordinarily.
369
start_time = time.time()
370
self.get_server().add_latency = 0.5
371
transport = self.get_transport()
372
transport.has('not me') # Force connection by issuing a request
373
with_latency_knob_time = time.time() - start_time
374
self.assertTrue(with_latency_knob_time > 0.4)
376
def test_default(self):
377
# This test is potentially brittle: under extremely high machine load
378
# it could fail, but that is quite unlikely
379
raise TestSkipped('Timing-sensitive test')
380
start_time = time.time()
381
transport = self.get_transport()
382
transport.has('not me') # Force connection by issuing a request
383
regular_time = time.time() - start_time
384
self.assertTrue(regular_time < 0.5)
387
class FakeSocket(object):
388
"""Fake socket object used to test the SocketDelay wrapper without
395
def send(self, data, flags=0):
399
def sendall(self, data, flags=0):
403
def recv(self, size, flags=0):
404
if size < len(self._data):
405
result = self._data[:size]
406
self._data = self._data[size:]
414
class TestSocketDelay(TestCase):
418
if not paramiko_loaded:
419
raise TestSkipped('you must have paramiko to run this test')
421
def test_delay(self):
422
from bzrlib.transport.sftp import SocketDelay
423
sending = FakeSocket()
424
receiving = SocketDelay(sending, 0.1, bandwidth=1000000,
426
# check that simulated time is charged only per round-trip:
427
t1 = SocketDelay.simulated_time
428
receiving.send("connect1")
429
self.assertEqual(sending.recv(1024), "connect1")
430
t2 = SocketDelay.simulated_time
431
self.assertAlmostEqual(t2 - t1, 0.1)
432
receiving.send("connect2")
433
self.assertEqual(sending.recv(1024), "connect2")
434
sending.send("hello")
435
self.assertEqual(receiving.recv(1024), "hello")
436
t3 = SocketDelay.simulated_time
437
self.assertAlmostEqual(t3 - t2, 0.1)
438
sending.send("hello")
439
self.assertEqual(receiving.recv(1024), "hello")
440
sending.send("hello")
441
self.assertEqual(receiving.recv(1024), "hello")
442
sending.send("hello")
443
self.assertEqual(receiving.recv(1024), "hello")
444
t4 = SocketDelay.simulated_time
445
self.assertAlmostEqual(t4, t3)
447
def test_bandwidth(self):
448
from bzrlib.transport.sftp import SocketDelay
449
sending = FakeSocket()
450
receiving = SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
452
# check that simulated time is charged only per round-trip:
453
t1 = SocketDelay.simulated_time
454
receiving.send("connect")
455
self.assertEqual(sending.recv(1024), "connect")
456
sending.send("a" * 100)
457
self.assertEqual(receiving.recv(1024), "a" * 100)
458
t2 = SocketDelay.simulated_time
459
self.assertAlmostEqual(t2 - t1, 100 + 7)
34
-----BEGIN RSA PRIVATE KEY-----
35
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
36
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
37
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
38
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
39
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
40
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
41
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
42
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
43
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
44
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
45
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
46
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
47
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
48
-----END RSA PRIVATE KEY-----
52
class SingleListener (threading.Thread):
53
def __init__(self, callback):
54
threading.Thread.__init__(self)
55
self._callback = callback
56
self._socket = socket.socket()
57
self._socket.listen(1)
58
self.port = self._socket.getsockname()[1]
59
self.stop_event = threading.Event()
62
s, _ = self._socket.accept()
63
# now close the listen socket
65
self._callback(s, self.stop_event)
71
class TestCaseWithSFTPServer (TestCaseInTempDir):
73
Execute a test case with a stub SFTP server, serving files from the local
74
filesystem over the loopback network.
77
def _run_server(self, s, stop_event):
78
ssh_server = paramiko.Transport(s)
79
key_file = os.path.join(self._root, 'test_rsa.key')
80
file(key_file, 'w').write(STUB_SERVER_KEY)
81
host_key = paramiko.RSAKey.from_private_key_file(key_file)
82
ssh_server.add_server_key(host_key)
84
ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
85
event = threading.Event()
86
ssh_server.start_server(event, server)
91
TestCaseInTempDir.setUp(self)
92
self._root = self.test_dir
94
self._listener = SingleListener(self._run_server)
95
self._listener.setDaemon(True)
96
self._listener.start()
97
self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
100
self._listener.stop()
101
TestCaseInTempDir.tearDown(self)
104
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
107
def get_transport(self):
108
from bzrlib.transport.sftp import SFTPTransport
110
return SFTPTransport(url)
112
if not paramiko_loaded:
113
del SFTPTransportTest