1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
2
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
4
3
# This program is free software; you can redistribute it and/or modify
5
4
# it under the terms of the GNU General Public License as published by
6
5
# the Free Software Foundation; either version 2 of the License, or
7
6
# (at your option) any later version.
9
8
# This program is distributed in the hope that it will be useful,
10
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
11
# GNU General Public License for more details.
14
13
# You should have received a copy of the GNU General Public License
15
14
# along with this program; if not, write to the Free Software
16
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24
import bzrlib.bzrdir as bzrdir
25
import bzrlib.errors as errors
26
from bzrlib.osutils import pathjoin, lexists, set_or_unset_env
27
from bzrlib.tests import TestCaseWithTransport, TestCase, TestSkipped
28
from bzrlib.tests.HttpServer import HttpServer
29
import bzrlib.transport
30
from bzrlib.transport import get_transport
31
import bzrlib.transport.http
32
from bzrlib.workingtree import WorkingTree
22
from bzrlib.selftest import TestCaseInTempDir
23
from bzrlib.selftest.testtransport import TestTransportMixIn
27
from stub_sftp import StubServer, StubSFTPServer
36
28
paramiko_loaded = True
37
29
except ImportError:
38
30
paramiko_loaded = False
41
def set_test_transport_to_sftp(testcase):
42
"""A helper to set transports on test case instances."""
43
from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
44
if getattr(testcase, '_get_remote_is_absolute', None) is None:
45
testcase._get_remote_is_absolute = True
46
if testcase._get_remote_is_absolute:
47
testcase.transport_server = SFTPAbsoluteServer
49
testcase.transport_server = SFTPHomeDirServer
50
testcase.transport_readonly_server = HttpServer
53
class TestCaseWithSFTPServer(TestCaseWithTransport):
54
"""A test case base class that provides a sftp server on localhost."""
57
super(TestCaseWithSFTPServer, self).setUp()
58
if not paramiko_loaded:
59
raise TestSkipped('you must have paramiko to run this test')
60
set_test_transport_to_sftp(self)
62
def get_transport(self, path=None):
63
"""Return a transport relative to self._test_root."""
64
return bzrlib.transport.get_transport(self.get_url(path))
67
class SFTPLockTests (TestCaseWithSFTPServer):
69
def test_sftp_locks(self):
70
from bzrlib.errors import LockError
71
t = self.get_transport()
73
l = t.lock_write('bogus')
74
self.failUnlessExists('bogus.write-lock')
76
# Don't wait for the lock, locking an already locked
77
# file should raise an assert
78
self.assertRaises(LockError, t.lock_write, 'bogus')
81
self.failIf(lexists('bogus.write-lock'))
83
open('something.write-lock', 'wb').write('fake lock\n')
84
self.assertRaises(LockError, t.lock_write, 'something')
85
os.remove('something.write-lock')
87
l = t.lock_write('something')
89
l2 = t.lock_write('bogus')
94
def test_multiple_connections(self):
95
t = self.get_transport()
96
self.assertTrue('sftpserver - new connection' in self.get_server().logs)
97
self.get_server().logs = []
98
# The second request should reuse the first connection
99
# SingleListener only allows for a single connection,
100
# So the next line fails unless the connection is reused
101
t2 = self.get_transport()
102
self.assertEquals(self.get_server().logs, [])
105
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
106
"""Test the SFTP transport with homedir based relative paths."""
108
def test__remote_path(self):
109
t = self.get_transport()
110
# This test require unix-like absolute path
111
test_dir = self.test_dir
112
if sys.platform == 'win32':
113
# using hack suggested by John Meinel.
114
# TODO: write another mock server for this test
115
# and use absolute path without drive letter
116
test_dir = '/' + test_dir
117
# try what is currently used:
118
# remote path = self._abspath(relpath)
119
self.assertEqual(test_dir + '/relative', t._remote_path('relative'))
120
# we dont os.path.join because windows gives us the wrong path
121
root_segments = test_dir.split('/')
122
root_parent = '/'.join(root_segments[:-1])
123
# .. should be honoured
124
self.assertEqual(root_parent + '/sibling', t._remote_path('../sibling'))
125
# / should be illegal ?
126
### FIXME decide and then test for all transports. RBC20051208
129
class SFTPTransportTestRelativeRoot(TestCaseWithSFTPServer):
130
"""Test the SFTP transport with homedir based relative paths."""
133
self._get_remote_is_absolute = False
134
super(SFTPTransportTestRelativeRoot, self).setUp()
136
def test__remote_path_relative_root(self):
137
# relative paths are preserved
138
t = self.get_transport('')
139
# the remote path should be ''
140
self.assertEqual('', t._path)
141
self.assertEqual('a', t._remote_path('a'))
144
class FakeSFTPTransport (object):
146
fake = FakeSFTPTransport()
149
class SFTPNonServerTest(TestCase):
152
if not paramiko_loaded:
153
raise TestSkipped('you must have paramiko to run this test')
155
def test_parse_url(self):
156
from bzrlib.transport.sftp import SFTPTransport
157
s = SFTPTransport('sftp://simple.example.com/home/source', clone_from=fake)
158
self.assertEquals(s._host, 'simple.example.com')
159
self.assertEquals(s._port, None)
160
self.assertEquals(s._path, '/home/source')
161
self.failUnless(s._password is None)
163
self.assertEquals(s.base, 'sftp://simple.example.com/home/source/')
165
s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/~/relative', clone_from=fake)
166
self.assertEquals(s._host, 'example.com')
167
self.assertEquals(s._port, 2222)
168
self.assertEquals(s._username, 'robey')
169
self.assertEquals(s._password, 'h@t')
170
self.assertEquals(s._path, 'relative')
172
# Base should not keep track of the password
173
self.assertEquals(s.base, 'sftp://robey@example.com:2222/~/relative/')
175
def test_relpath(self):
176
from bzrlib.transport.sftp import SFTPTransport
177
from bzrlib.errors import PathNotChild
179
s = SFTPTransport('sftp://user@host.com/abs/path', clone_from=fake)
180
self.assertEquals(s.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
181
# Can't test this one, because we actually get an AssertionError
182
# TODO: Consider raising an exception rather than an assert
183
#self.assertRaises(PathNotChild, s.relpath, 'http://user@host.com/abs/path/sub')
184
self.assertRaises(PathNotChild, s.relpath, 'sftp://user2@host.com/abs/path/sub')
185
self.assertRaises(PathNotChild, s.relpath, 'sftp://user@otherhost.com/abs/path/sub')
186
self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com:33/abs/path/sub')
187
self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com/~/rel/path/sub')
189
# Make sure it works when we don't supply a username
190
s = SFTPTransport('sftp://host.com/abs/path', clone_from=fake)
191
self.assertEquals(s.relpath('sftp://host.com/abs/path/sub'), 'sub')
193
# Make sure it works when parts of the path will be url encoded
194
# TODO: These may be incorrect, we might need to urllib.urlencode() before
195
# we pass the paths into the SFTPTransport constructor
196
s = SFTPTransport('sftp://host.com/dev/,path', clone_from=fake)
197
self.assertEquals(s.relpath('sftp://host.com/dev/,path/sub'), 'sub')
198
s = SFTPTransport('sftp://host.com/dev/%path', clone_from=fake)
199
self.assertEquals(s.relpath('sftp://host.com/dev/%path/sub'), 'sub')
201
def test_parse_invalid_url(self):
202
from bzrlib.transport.sftp import SFTPTransport, TransportError
204
s = SFTPTransport('sftp://lilypond.org:~janneke/public_html/bzr/gub',
206
self.fail('expected exception not raised')
207
except TransportError, e:
208
self.assertEquals(str(e),
210
'invalid port number ~janneke in url:\n'
211
'sftp://lilypond.org:~janneke/public_html/bzr/gub ')
213
def test_get_paramiko_vendor(self):
214
"""Test that if no 'ssh' is available we get builtin paramiko"""
215
from bzrlib.transport import ssh
216
# set '.' as the only location in the path, forcing no 'ssh' to exist
217
orig_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
218
orig_path = set_or_unset_env('PATH', '.')
220
# No vendor defined yet, query for one
221
ssh._ssh_vendor_manager.clear_cache()
222
vendor = ssh._get_ssh_vendor()
223
self.assertIsInstance(vendor, ssh.ParamikoVendor)
225
set_or_unset_env('PATH', orig_path)
226
ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
228
def test_abspath_root_sibling_server(self):
229
from bzrlib.transport.sftp import SFTPSiblingAbsoluteServer
230
server = SFTPSiblingAbsoluteServer()
233
transport = get_transport(server.get_url())
234
self.assertFalse(transport.abspath('/').endswith('/~/'))
235
self.assertTrue(transport.abspath('/').endswith('/'))
241
class SFTPBranchTest(TestCaseWithSFTPServer):
242
"""Test some stuff when accessing a bzr Branch over sftp"""
244
def test_lock_file(self):
245
# old format branches use a special lock file on sftp.
246
b = self.make_branch('', format=bzrdir.BzrDirFormat6())
247
b = bzrlib.branch.Branch.open(self.get_url())
248
self.failUnlessExists('.bzr/')
249
self.failUnlessExists('.bzr/branch-format')
250
self.failUnlessExists('.bzr/branch-lock')
252
self.failIf(lexists('.bzr/branch-lock.write-lock'))
254
self.failUnlessExists('.bzr/branch-lock.write-lock')
256
self.failIf(lexists('.bzr/branch-lock.write-lock'))
258
def test_push_support(self):
259
self.build_tree(['a/', 'a/foo'])
260
t = bzrdir.BzrDir.create_standalone_workingtree('a')
263
t.commit('foo', rev_id='a1')
265
b2 = bzrdir.BzrDir.create_branch_and_repo(self.get_url('/b'))
268
self.assertEquals(b2.revision_history(), ['a1'])
270
open('a/foo', 'wt').write('something new in foo\n')
271
t.commit('new', rev_id='a2')
274
self.assertEquals(b2.revision_history(), ['a1', 'a2'])
277
class SSHVendorConnection(TestCaseWithSFTPServer):
278
"""Test that the ssh vendors can all connect.
280
Verify that a full-handshake (SSH over loopback TCP) sftp connection works.
282
We have 3 sftp implementations in the test suite:
283
'loopback': Doesn't use ssh, just uses a local socket. Most tests are
284
done this way to save the handshaking time, so it is not
286
'none': This uses paramiko's built-in ssh client and server, and layers
288
None: If 'ssh' exists on the machine, then it will be spawned as a
293
super(SSHVendorConnection, self).setUp()
294
from bzrlib.transport.sftp import SFTPFullAbsoluteServer
297
"""Just a wrapper so that when created, it will set _vendor"""
298
# SFTPFullAbsoluteServer can handle any vendor,
299
# it just needs to be set between the time it is instantiated
300
# and the time .setUp() is called
301
server = SFTPFullAbsoluteServer()
302
server._vendor = self._test_vendor
304
self._test_vendor = 'loopback'
305
self.vfs_transport_server = create_server
306
f = open('a_file', 'wb')
312
def set_vendor(self, vendor):
313
self._test_vendor = vendor
315
def test_connection_paramiko(self):
316
from bzrlib.transport import ssh
317
self.set_vendor(ssh.ParamikoVendor())
318
t = self.get_transport()
319
self.assertEqual('foobar\n', t.get('a_file').read())
321
def test_connection_vendor(self):
322
raise TestSkipped("We don't test spawning real ssh,"
323
" because it prompts for a password."
324
" Enable this test if we figure out"
325
" how to prevent this.")
326
self.set_vendor(None)
327
t = self.get_transport()
328
self.assertEqual('foobar\n', t.get('a_file').read())
331
class SSHVendorBadConnection(TestCaseWithTransport):
332
"""Test that the ssh vendors handle bad connection properly
334
We don't subclass TestCaseWithSFTPServer, because we don't actually
335
need an SFTP connection.
339
if not paramiko_loaded:
340
raise TestSkipped('you must have paramiko to run this test')
341
super(SSHVendorBadConnection, self).setUp()
342
import bzrlib.transport.ssh
344
# open a random port, so we know nobody else is using it
345
# but don't actually listen on the port.
347
s.bind(('localhost', 0))
348
self.bogus_url = 'sftp://%s:%s/' % s.getsockname()
350
orig_vendor = bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor
352
bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
354
self.addCleanup(reset)
356
def set_vendor(self, vendor):
357
import bzrlib.transport.ssh
358
bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = vendor
360
def test_bad_connection_paramiko(self):
361
"""Test that a real connection attempt raises the right error"""
362
from bzrlib.transport import ssh
363
self.set_vendor(ssh.ParamikoVendor())
364
self.assertRaises(errors.ConnectionError,
365
bzrlib.transport.get_transport, self.bogus_url)
367
def test_bad_connection_ssh(self):
368
"""None => auto-detect vendor"""
369
self.set_vendor(None)
370
# This is how I would normally test the connection code
371
# it makes it very clear what we are testing.
372
# However, 'ssh' will create stipple on the output, so instead
373
# I'm using run_bzr_subprocess, and parsing the output
375
# t = bzrlib.transport.get_transport(self.bogus_url)
376
# except errors.ConnectionError:
379
# except errors.NameError, e:
380
# if 'SSHException' in str(e):
381
# raise TestSkipped('Known NameError bug in paramiko 1.6.1')
384
# self.fail('Excepted ConnectionError to be raised')
386
out, err = self.run_bzr_subprocess('log', self.bogus_url, retcode=3)
387
self.assertEqual('', out)
388
if "NameError: global name 'SSHException'" in err:
389
# We aren't fixing this bug, because it is a bug in
390
# paramiko, but we know about it, so we don't have to
392
raise TestSkipped('Known NameError bug with paramiko-1.6.1')
393
self.assertContainsRe(err, r'bzr: ERROR: Unable to connect to SSH host'
394
r' 127\.0\.0\.1:\d+; ')
397
class SFTPLatencyKnob(TestCaseWithSFTPServer):
398
"""Test that the testing SFTPServer's latency knob works."""
400
def test_latency_knob_slows_transport(self):
401
# change the latency knob to 500ms. We take about 40ms for a
402
# loopback connection ordinarily.
403
start_time = time.time()
404
self.get_server().add_latency = 0.5
405
transport = self.get_transport()
406
with_latency_knob_time = time.time() - start_time
407
self.assertTrue(with_latency_knob_time > 0.4)
409
def test_default(self):
410
# This test is potentially brittle: under extremely high machine load
411
# it could fail, but that is quite unlikely
412
start_time = time.time()
413
transport = self.get_transport()
414
regular_time = time.time() - start_time
415
self.assertTrue(regular_time < 0.5)
418
class FakeSocket(object):
419
"""Fake socket object used to test the SocketDelay wrapper without
426
def send(self, data, flags=0):
430
def sendall(self, data, flags=0):
434
def recv(self, size, flags=0):
435
if size < len(self._data):
436
result = self._data[:size]
437
self._data = self._data[size:]
445
class TestSocketDelay(TestCase):
449
if not paramiko_loaded:
450
raise TestSkipped('you must have paramiko to run this test')
452
def test_delay(self):
453
from bzrlib.transport.sftp import SocketDelay
454
sending = FakeSocket()
455
receiving = SocketDelay(sending, 0.1, bandwidth=1000000,
457
# check that simulated time is charged only per round-trip:
458
t1 = SocketDelay.simulated_time
459
receiving.send("connect1")
460
self.assertEqual(sending.recv(1024), "connect1")
461
t2 = SocketDelay.simulated_time
462
self.assertAlmostEqual(t2 - t1, 0.1)
463
receiving.send("connect2")
464
self.assertEqual(sending.recv(1024), "connect2")
465
sending.send("hello")
466
self.assertEqual(receiving.recv(1024), "hello")
467
t3 = SocketDelay.simulated_time
468
self.assertAlmostEqual(t3 - t2, 0.1)
469
sending.send("hello")
470
self.assertEqual(receiving.recv(1024), "hello")
471
sending.send("hello")
472
self.assertEqual(receiving.recv(1024), "hello")
473
sending.send("hello")
474
self.assertEqual(receiving.recv(1024), "hello")
475
t4 = SocketDelay.simulated_time
476
self.assertAlmostEqual(t4, t3)
478
def test_bandwidth(self):
479
from bzrlib.transport.sftp import SocketDelay
480
sending = FakeSocket()
481
receiving = SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
483
# check that simulated time is charged only per round-trip:
484
t1 = SocketDelay.simulated_time
485
receiving.send("connect")
486
self.assertEqual(sending.recv(1024), "connect")
487
sending.send("a" * 100)
488
self.assertEqual(receiving.recv(1024), "a" * 100)
489
t2 = SocketDelay.simulated_time
490
self.assertAlmostEqual(t2 - t1, 100 + 7)
34
-----BEGIN RSA PRIVATE KEY-----
35
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
36
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
37
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
38
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
39
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
40
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
41
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
42
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
43
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
44
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
45
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
46
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
47
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
48
-----END RSA PRIVATE KEY-----
52
class SingleListener (threading.Thread):
53
def __init__(self, callback):
54
threading.Thread.__init__(self)
55
self._callback = callback
56
self._socket = socket.socket()
57
self._socket.listen(1)
58
self.port = self._socket.getsockname()[1]
59
self.stop_event = threading.Event()
62
s, _ = self._socket.accept()
63
# now close the listen socket
65
self._callback(s, self.stop_event)
71
class TestCaseWithSFTPServer (TestCaseInTempDir):
73
Execute a test case with a stub SFTP server, serving files from the local
74
filesystem over the loopback network.
77
def _run_server(self, s, stop_event):
78
ssh_server = paramiko.Transport(s)
79
key_file = os.path.join(self._root, 'test_rsa.key')
80
file(key_file, 'w').write(STUB_SERVER_KEY)
81
host_key = paramiko.RSAKey.from_private_key_file(key_file)
82
ssh_server.add_server_key(host_key)
84
ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
85
event = threading.Event()
86
ssh_server.start_server(event, server)
91
TestCaseInTempDir.setUp(self)
92
self._root = self.test_dir
94
self._listener = SingleListener(self._run_server)
95
self._listener.setDaemon(True)
96
self._listener.start()
97
self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
100
self._listener.stop()
101
TestCaseInTempDir.tearDown(self)
104
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
107
def get_transport(self):
108
from bzrlib.transport.sftp import SFTPTransport
110
return SFTPTransport(url)
112
if not paramiko_loaded:
113
del SFTPTransportTest