1
# Copyright (C) 2005-2010 Robey Pointer <robey@lag.net>
2
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
4
3
# This program is free software; you can redistribute it and/or modify
5
4
# it under the terms of the GNU General Public License as published by
6
5
# the Free Software Foundation; either version 2 of the License, or
7
6
# (at your option) any later version.
9
8
# This program is distributed in the hope that it will be useful,
10
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
11
# GNU General Public License for more details.
14
13
# You should have received a copy of the GNU General Public License
15
14
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
29
transport as _mod_transport,
32
from bzrlib.osutils import (
37
from bzrlib.tests import (
39
TestCaseWithTransport,
43
from bzrlib.tests.http_server import HttpServer
44
from bzrlib.transport import get_transport
45
import bzrlib.transport.http
47
if features.paramiko.available():
48
from bzrlib.transport import sftp as _mod_sftp
49
from bzrlib.tests import stub_sftp
51
from bzrlib.workingtree import WorkingTree
54
def set_test_transport_to_sftp(testcase):
55
"""A helper to set transports on test case instances."""
56
if getattr(testcase, '_get_remote_is_absolute', None) is None:
57
testcase._get_remote_is_absolute = True
58
if testcase._get_remote_is_absolute:
59
testcase.transport_server = stub_sftp.SFTPAbsoluteServer
61
testcase.transport_server = stub_sftp.SFTPHomeDirServer
62
testcase.transport_readonly_server = HttpServer
65
class TestCaseWithSFTPServer(TestCaseWithTransport):
66
"""A test case base class that provides a sftp server on localhost."""
22
from bzrlib.tests import TestCaseInTempDir
23
from bzrlib.tests.test_transport import TestTransportMixIn
27
from stub_sftp import StubServer, StubSFTPServer
28
paramiko_loaded = True
30
paramiko_loaded = False
34
-----BEGIN RSA PRIVATE KEY-----
35
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
36
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
37
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
38
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
39
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
40
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
41
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
42
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
43
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
44
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
45
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
46
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
47
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
48
-----END RSA PRIVATE KEY-----
52
class SingleListener (threading.Thread):
53
def __init__(self, callback):
54
threading.Thread.__init__(self)
55
self._callback = callback
56
self._socket = socket.socket()
57
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
58
self._socket.bind(('localhost', 0))
59
self._socket.listen(1)
60
self.port = self._socket.getsockname()[1]
61
self.stop_event = threading.Event()
64
s, _ = self._socket.accept()
65
# now close the listen socket
67
self._callback(s, self.stop_event)
73
class TestCaseWithSFTPServer (TestCaseInTempDir):
75
Execute a test case with a stub SFTP server, serving files from the local
76
filesystem over the loopback network.
79
def _run_server(self, s, stop_event):
80
ssh_server = paramiko.Transport(s)
81
key_file = os.path.join(self._root, 'test_rsa.key')
82
file(key_file, 'w').write(STUB_SERVER_KEY)
83
host_key = paramiko.RSAKey.from_private_key_file(key_file)
84
ssh_server.add_server_key(host_key)
86
ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
87
event = threading.Event()
88
ssh_server.start_server(event, server)
69
super(TestCaseWithSFTPServer, self).setUp()
70
self.requireFeature(features.paramiko)
71
set_test_transport_to_sftp(self)
74
class SFTPLockTests(TestCaseWithSFTPServer):
93
TestCaseInTempDir.setUp(self)
94
self._root = self.test_dir
96
def delayed_setup(self):
97
# some tests are just stubs that call setUp and then immediately call
98
# tearDwon. so don't create the port listener until get_transport is
99
# called and we know we're in an actual test.
100
self._listener = SingleListener(self._run_server)
101
self._listener.setDaemon(True)
102
self._listener.start()
103
self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
107
self._listener.stop()
108
except AttributeError:
110
TestCaseInTempDir.tearDown(self)
113
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
117
def get_transport(self):
121
from bzrlib.transport.sftp import SFTPTransport
123
return SFTPTransport(url)
76
125
def test_sftp_locks(self):
77
126
from bzrlib.errors import LockError
102
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
103
"""Test the SFTP transport with homedir based relative paths."""
105
def test__remote_path(self):
106
if sys.platform == 'darwin':
107
# This test is about sftp absolute path handling. There is already
108
# (in this test) a TODO about windows needing an absolute path
109
# without drive letter. To me, using self.test_dir is a trick to
110
# get an absolute path for comparison purposes. That fails for OSX
111
# because the sftp server doesn't resolve the links (and it doesn't
112
# have to). --vila 20070924
113
self.knownFailure('Mac OSX symlinks /tmp to /private/tmp,'
114
' testing against self.test_dir'
115
' is not appropriate')
116
t = self.get_transport()
117
# This test require unix-like absolute path
118
test_dir = self.test_dir
119
if sys.platform == 'win32':
120
# using hack suggested by John Meinel.
121
# TODO: write another mock server for this test
122
# and use absolute path without drive letter
123
test_dir = '/' + test_dir
124
# try what is currently used:
125
# remote path = self._abspath(relpath)
126
self.assertIsSameRealPath(test_dir + '/relative',
127
t._remote_path('relative'))
128
# we dont os.path.join because windows gives us the wrong path
129
root_segments = test_dir.split('/')
130
root_parent = '/'.join(root_segments[:-1])
131
# .. should be honoured
132
self.assertIsSameRealPath(root_parent + '/sibling',
133
t._remote_path('../sibling'))
134
# / should be illegal ?
135
### FIXME decide and then test for all transports. RBC20051208
138
class SFTPTransportTestRelativeRoot(TestCaseWithSFTPServer):
139
"""Test the SFTP transport with homedir based relative paths."""
142
# Only SFTPHomeDirServer is tested here
143
self._get_remote_is_absolute = False
144
super(SFTPTransportTestRelativeRoot, self).setUp()
146
def test__remote_path_relative_root(self):
147
# relative paths are preserved
148
t = self.get_transport('')
149
self.assertEqual('/~/', t._path)
150
# the remote path should be relative to home dir
151
# (i.e. not begining with a '/')
152
self.assertEqual('a', t._remote_path('a'))
155
class SFTPNonServerTest(TestCase):
158
self.requireFeature(features.paramiko)
160
def test_parse_url_with_home_dir(self):
161
s = _mod_sftp.SFTPTransport(
162
'sftp://ro%62ey:h%40t@example.com:2222/~/relative')
151
class FakeSFTPTransport (object):
153
fake = FakeSFTPTransport()
156
class SFTPNonServerTest(unittest.TestCase):
157
def test_parse_url(self):
158
from bzrlib.transport.sftp import SFTPTransport
159
s = SFTPTransport('sftp://simple.example.com/%2fhome/source', clone_from=fake)
160
self.assertEquals(s._host, 'simple.example.com')
161
self.assertEquals(s._port, 22)
162
self.assertEquals(s._path, '/home/source')
163
self.assert_(s._password is None)
165
s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
163
166
self.assertEquals(s._host, 'example.com')
164
167
self.assertEquals(s._port, 2222)
165
self.assertEquals(s._user, 'robey')
168
self.assertEquals(s._username, 'robey')
166
169
self.assertEquals(s._password, 'h@t')
167
self.assertEquals(s._path, '/~/relative/')
169
def test_relpath(self):
170
s = _mod_sftp.SFTPTransport('sftp://user@host.com/abs/path')
171
self.assertRaises(errors.PathNotChild, s.relpath,
172
'sftp://user@host.com/~/rel/path/sub')
174
def test_get_paramiko_vendor(self):
175
"""Test that if no 'ssh' is available we get builtin paramiko"""
176
from bzrlib.transport import ssh
177
# set '.' as the only location in the path, forcing no 'ssh' to exist
178
orig_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
179
orig_path = set_or_unset_env('PATH', '.')
181
# No vendor defined yet, query for one
182
ssh._ssh_vendor_manager.clear_cache()
183
vendor = ssh._get_ssh_vendor()
184
self.assertIsInstance(vendor, ssh.ParamikoVendor)
186
set_or_unset_env('PATH', orig_path)
187
ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
189
def test_abspath_root_sibling_server(self):
190
server = stub_sftp.SFTPSiblingAbsoluteServer()
191
server.start_server()
193
transport = get_transport(server.get_url())
194
self.assertFalse(transport.abspath('/').endswith('/~/'))
195
self.assertTrue(transport.abspath('/').endswith('/'))
170
self.assertEquals(s._path, 'relative')
172
def test_parse_invalid_url(self):
173
from bzrlib.transport.sftp import SFTPTransport, SFTPTransportError
175
s = SFTPTransport('sftp://lilypond.org:~janneke/public_html/bzr/gub',
177
self.fail('expected exception not raised')
178
except SFTPTransportError, e:
179
self.assertEquals(str(e),
180
'~janneke: invalid port number')
201
184
class SFTPBranchTest(TestCaseWithSFTPServer):
202
185
"""Test some stuff when accessing a bzr Branch over sftp"""
204
187
def test_lock_file(self):
205
# old format branches use a special lock file on sftp.
206
b = self.make_branch('', format=bzrdir.BzrDirFormat6())
207
b = bzrlib.branch.Branch.open(self.get_url())
188
"""Make sure that a Branch accessed over sftp tries to lock itself."""
189
from bzrlib.branch import Branch
192
b = Branch.initialize(self._sftp_url)
208
193
self.failUnlessExists('.bzr/')
209
194
self.failUnlessExists('.bzr/branch-format')
210
195
self.failUnlessExists('.bzr/branch-lock')
212
self.failIf(lexists('.bzr/branch-lock.write-lock'))
197
self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
214
199
self.failUnlessExists('.bzr/branch-lock.write-lock')
216
self.failIf(lexists('.bzr/branch-lock.write-lock'))
218
def test_push_support(self):
219
self.build_tree(['a/', 'a/foo'])
220
t = bzrdir.BzrDir.create_standalone_workingtree('a')
223
t.commit('foo', rev_id='a1')
225
b2 = bzrdir.BzrDir.create_branch_and_repo(self.get_url('/b'))
228
self.assertEquals(b2.revision_history(), ['a1'])
230
open('a/foo', 'wt').write('something new in foo\n')
231
t.commit('new', rev_id='a2')
234
self.assertEquals(b2.revision_history(), ['a1', 'a2'])
237
class SSHVendorConnection(TestCaseWithSFTPServer):
238
"""Test that the ssh vendors can all connect.
240
Verify that a full-handshake (SSH over loopback TCP) sftp connection works.
242
We have 3 sftp implementations in the test suite:
243
'loopback': Doesn't use ssh, just uses a local socket. Most tests are
244
done this way to save the handshaking time, so it is not
246
'none': This uses paramiko's built-in ssh client and server, and layers
248
None: If 'ssh' exists on the machine, then it will be spawned as a
253
super(SSHVendorConnection, self).setUp()
256
"""Just a wrapper so that when created, it will set _vendor"""
257
# SFTPFullAbsoluteServer can handle any vendor,
258
# it just needs to be set between the time it is instantiated
259
# and the time .setUp() is called
260
server = stub_sftp.SFTPFullAbsoluteServer()
261
server._vendor = self._test_vendor
263
self._test_vendor = 'loopback'
264
self.vfs_transport_server = create_server
265
f = open('a_file', 'wb')
271
def set_vendor(self, vendor):
272
self._test_vendor = vendor
274
def test_connection_paramiko(self):
275
from bzrlib.transport import ssh
276
self.set_vendor(ssh.ParamikoVendor())
277
t = self.get_transport()
278
self.assertEqual('foobar\n', t.get('a_file').read())
280
def test_connection_vendor(self):
281
raise TestSkipped("We don't test spawning real ssh,"
282
" because it prompts for a password."
283
" Enable this test if we figure out"
284
" how to prevent this.")
285
self.set_vendor(None)
286
t = self.get_transport()
287
self.assertEqual('foobar\n', t.get('a_file').read())
290
class SSHVendorBadConnection(TestCaseWithTransport):
291
"""Test that the ssh vendors handle bad connection properly
293
We don't subclass TestCaseWithSFTPServer, because we don't actually
294
need an SFTP connection.
298
self.requireFeature(features.paramiko)
299
super(SSHVendorBadConnection, self).setUp()
301
# open a random port, so we know nobody else is using it
302
# but don't actually listen on the port.
304
s.bind(('localhost', 0))
305
self.addCleanup(s.close)
306
self.bogus_url = 'sftp://%s:%s/' % s.getsockname()
308
def set_vendor(self, vendor):
309
from bzrlib.transport import ssh
310
self.overrideAttr(ssh._ssh_vendor_manager, '_cached_ssh_vendor', vendor)
312
def test_bad_connection_paramiko(self):
313
"""Test that a real connection attempt raises the right error"""
314
from bzrlib.transport import ssh
315
self.set_vendor(ssh.ParamikoVendor())
316
t = bzrlib.transport.get_transport(self.bogus_url)
317
self.assertRaises(errors.ConnectionError, t.get, 'foobar')
319
def test_bad_connection_ssh(self):
320
"""None => auto-detect vendor"""
321
self.set_vendor(None)
322
# This is how I would normally test the connection code
323
# it makes it very clear what we are testing.
324
# However, 'ssh' will create stipple on the output, so instead
325
# I'm using run_bzr_subprocess, and parsing the output
327
# t = bzrlib.transport.get_transport(self.bogus_url)
328
# except errors.ConnectionError:
331
# except errors.NameError, e:
332
# if 'SSHException' in str(e):
333
# raise TestSkipped('Known NameError bug in paramiko 1.6.1')
336
# self.fail('Excepted ConnectionError to be raised')
338
out, err = self.run_bzr_subprocess(['log', self.bogus_url], retcode=3)
339
self.assertEqual('', out)
340
if "NameError: global name 'SSHException'" in err:
341
# We aren't fixing this bug, because it is a bug in
342
# paramiko, but we know about it, so we don't have to
344
raise TestSkipped('Known NameError bug with paramiko-1.6.1')
345
self.assertContainsRe(err, r'bzr: ERROR: Unable to connect to SSH host'
346
r' 127\.0\.0\.1:\d+; ')
349
class SFTPLatencyKnob(TestCaseWithSFTPServer):
350
"""Test that the testing SFTPServer's latency knob works."""
352
def test_latency_knob_slows_transport(self):
353
# change the latency knob to 500ms. We take about 40ms for a
354
# loopback connection ordinarily.
355
start_time = time.time()
356
self.get_server().add_latency = 0.5
357
transport = self.get_transport()
358
transport.has('not me') # Force connection by issuing a request
359
with_latency_knob_time = time.time() - start_time
360
self.assertTrue(with_latency_knob_time > 0.4)
362
def test_default(self):
363
# This test is potentially brittle: under extremely high machine load
364
# it could fail, but that is quite unlikely
365
raise TestSkipped('Timing-sensitive test')
366
start_time = time.time()
367
transport = self.get_transport()
368
transport.has('not me') # Force connection by issuing a request
369
regular_time = time.time() - start_time
370
self.assertTrue(regular_time < 0.5)
373
class FakeSocket(object):
374
"""Fake socket object used to test the SocketDelay wrapper without
381
def send(self, data, flags=0):
385
def sendall(self, data, flags=0):
389
def recv(self, size, flags=0):
390
if size < len(self._data):
391
result = self._data[:size]
392
self._data = self._data[size:]
400
class TestSocketDelay(TestCase):
404
self.requireFeature(features.paramiko)
406
def test_delay(self):
407
sending = FakeSocket()
408
receiving = stub_sftp.SocketDelay(sending, 0.1, bandwidth=1000000,
410
# check that simulated time is charged only per round-trip:
411
t1 = stub_sftp.SocketDelay.simulated_time
412
receiving.send("connect1")
413
self.assertEqual(sending.recv(1024), "connect1")
414
t2 = stub_sftp.SocketDelay.simulated_time
415
self.assertAlmostEqual(t2 - t1, 0.1)
416
receiving.send("connect2")
417
self.assertEqual(sending.recv(1024), "connect2")
418
sending.send("hello")
419
self.assertEqual(receiving.recv(1024), "hello")
420
t3 = stub_sftp.SocketDelay.simulated_time
421
self.assertAlmostEqual(t3 - t2, 0.1)
422
sending.send("hello")
423
self.assertEqual(receiving.recv(1024), "hello")
424
sending.send("hello")
425
self.assertEqual(receiving.recv(1024), "hello")
426
sending.send("hello")
427
self.assertEqual(receiving.recv(1024), "hello")
428
t4 = stub_sftp.SocketDelay.simulated_time
429
self.assertAlmostEqual(t4, t3)
431
def test_bandwidth(self):
432
sending = FakeSocket()
433
receiving = stub_sftp.SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
435
# check that simulated time is charged only per round-trip:
436
t1 = stub_sftp.SocketDelay.simulated_time
437
receiving.send("connect")
438
self.assertEqual(sending.recv(1024), "connect")
439
sending.send("a" * 100)
440
self.assertEqual(receiving.recv(1024), "a" * 100)
441
t2 = stub_sftp.SocketDelay.simulated_time
442
self.assertAlmostEqual(t2 - t1, 100 + 7)
445
class ReadvFile(object):
446
"""An object that acts like Paramiko's SFTPFile.readv()"""
448
def __init__(self, data):
451
def readv(self, requests):
452
for start, length in requests:
453
yield self._data[start:start+length]
456
def _null_report_activity(*a, **k):
460
class Test_SFTPReadvHelper(tests.TestCase):
462
def checkGetRequests(self, expected_requests, offsets):
463
self.requireFeature(features.paramiko)
464
helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test',
465
_null_report_activity)
466
self.assertEqual(expected_requests, helper._get_requests())
468
def test__get_requests(self):
469
# Small single requests become a single readv request
470
self.checkGetRequests([(0, 100)],
471
[(0, 20), (30, 50), (20, 10), (80, 20)])
472
# Non-contiguous ranges are given as multiple requests
473
self.checkGetRequests([(0, 20), (30, 50)],
474
[(10, 10), (30, 20), (0, 10), (50, 30)])
475
# Ranges larger than _max_request_size (32kB) are broken up into
476
# multiple requests, even if it actually spans multiple logical
478
self.checkGetRequests([(0, 32768), (32768, 32768), (65536, 464)],
479
[(0, 40000), (40000, 100), (40100, 1900),
482
def checkRequestAndYield(self, expected, data, offsets):
483
self.requireFeature(features.paramiko)
484
helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test',
485
_null_report_activity)
486
data_f = ReadvFile(data)
487
result = list(helper.request_and_yield_offsets(data_f))
488
self.assertEqual(expected, result)
490
def test_request_and_yield_offsets(self):
491
data = 'abcdefghijklmnopqrstuvwxyz'
492
self.checkRequestAndYield([(0, 'a'), (5, 'f'), (10, 'klm')], data,
493
[(0, 1), (5, 1), (10, 3)])
494
# Should combine requests, and split them again
495
self.checkRequestAndYield([(0, 'a'), (1, 'b'), (10, 'klm')], data,
496
[(0, 1), (1, 1), (10, 3)])
497
# Out of order requests. The requests should get combined, but then be
498
# yielded out-of-order. We also need one that is at the end of a
499
# previous range. See bug #293746
500
self.checkRequestAndYield([(0, 'a'), (10, 'k'), (4, 'efg'), (1, 'bcd')],
501
data, [(0, 1), (10, 1), (4, 3), (1, 3)])
504
class TestUsesAuthConfig(TestCaseWithSFTPServer):
505
"""Test that AuthenticationConfig can supply default usernames."""
507
def get_transport_for_connection(self, set_config):
508
port = self.get_server()._listener.port
510
conf = config.AuthenticationConfig()
511
conf._get_config().update(
512
{'sftptest': {'scheme': 'ssh', 'port': port, 'user': 'bar'}})
514
t = get_transport('sftp://localhost:%d' % port)
515
# force a connection to be performed.
519
def test_sftp_uses_config(self):
520
t = self.get_transport_for_connection(set_config=True)
521
self.assertEqual('bar', t._get_credentials()[0])
523
def test_sftp_is_none_if_no_config(self):
524
t = self.get_transport_for_connection(set_config=False)
525
self.assertIs(None, t._get_credentials()[0])
527
def test_sftp_doesnt_prompt_username(self):
528
stdout = tests.StringIOWrapper()
529
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n', stdout=stdout)
530
t = self.get_transport_for_connection(set_config=False)
531
self.assertIs(None, t._get_credentials()[0])
532
# No prompts should've been printed, stdin shouldn't have been read
533
self.assertEquals("", stdout.getvalue())
534
self.assertEquals(0, ui.ui_factory.stdin.tell())
201
self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
204
if not paramiko_loaded:
206
del SFTPTransportTest
207
del SFTPNonServerTest