22
import bzrlib.bzrdir as bzrdir
23
import bzrlib.errors as errors
24
from bzrlib.osutils import pathjoin, lexists, set_or_unset_env
25
from bzrlib.tests import TestCaseWithTransport, TestCase, TestSkipped
26
from bzrlib.tests.HttpServer import HttpServer
27
import bzrlib.transport
28
from bzrlib.transport import get_transport
29
import bzrlib.transport.http
30
from bzrlib.workingtree import WorkingTree
22
from bzrlib.tests import TestCaseInTempDir
23
from bzrlib.tests.test_transport import TestTransportMixIn
27
from stub_sftp import StubServer, StubSFTPServer
34
28
paramiko_loaded = True
35
29
except ImportError:
36
30
paramiko_loaded = False
39
def set_test_transport_to_sftp(testcase):
40
"""A helper to set transports on test case instances."""
41
from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
42
if getattr(testcase, '_get_remote_is_absolute', None) is None:
43
testcase._get_remote_is_absolute = True
44
if testcase._get_remote_is_absolute:
45
testcase.transport_server = SFTPAbsoluteServer
47
testcase.transport_server = SFTPHomeDirServer
48
testcase.transport_readonly_server = HttpServer
51
class TestCaseWithSFTPServer(TestCaseWithTransport):
52
"""A test case base class that provides a sftp server on localhost."""
34
-----BEGIN RSA PRIVATE KEY-----
35
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
36
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
37
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
38
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
39
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
40
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
41
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
42
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
43
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
44
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
45
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
46
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
47
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
48
-----END RSA PRIVATE KEY-----
52
class SingleListener (threading.Thread):
53
def __init__(self, callback):
54
threading.Thread.__init__(self)
55
self._callback = callback
56
self._socket = socket.socket()
57
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
58
self._socket.bind(('localhost', 0))
59
self._socket.listen(1)
60
self.port = self._socket.getsockname()[1]
61
self.stop_event = threading.Event()
64
s, _ = self._socket.accept()
65
# now close the listen socket
67
self._callback(s, self.stop_event)
73
class TestCaseWithSFTPServer (TestCaseInTempDir):
75
Execute a test case with a stub SFTP server, serving files from the local
76
filesystem over the loopback network.
79
def _run_server(self, s, stop_event):
80
ssh_server = paramiko.Transport(s)
81
key_file = os.path.join(self._root, 'test_rsa.key')
82
file(key_file, 'w').write(STUB_SERVER_KEY)
83
host_key = paramiko.RSAKey.from_private_key_file(key_file)
84
ssh_server.add_server_key(host_key)
86
ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
87
event = threading.Event()
88
ssh_server.start_server(event, server)
55
super(TestCaseWithSFTPServer, self).setUp()
56
if not paramiko_loaded:
57
raise TestSkipped('you must have paramiko to run this test')
58
set_test_transport_to_sftp(self)
60
def get_transport(self, path=None):
61
"""Return a transport relative to self._test_root."""
62
return bzrlib.transport.get_transport(self.get_url(path))
65
class SFTPLockTests (TestCaseWithSFTPServer):
93
TestCaseInTempDir.setUp(self)
94
self._root = self.test_dir
96
def delayed_setup(self):
97
# some tests are just stubs that call setUp and then immediately call
98
# tearDwon. so don't create the port listener until get_transport is
99
# called and we know we're in an actual test.
100
self._listener = SingleListener(self._run_server)
101
self._listener.setDaemon(True)
102
self._listener.start()
103
self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
107
self._listener.stop()
108
except AttributeError:
110
TestCaseInTempDir.tearDown(self)
113
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
117
def get_transport(self):
121
from bzrlib.transport.sftp import SFTPTransport
123
return SFTPTransport(url)
67
125
def test_sftp_locks(self):
68
126
from bzrlib.errors import LockError
92
def test_multiple_connections(self):
93
t = self.get_transport()
94
self.assertTrue('sftpserver - new connection' in self.get_server().logs)
95
self.get_server().logs = []
96
# The second request should reuse the first connection
97
# SingleListener only allows for a single connection,
98
# So the next line fails unless the connection is reused
99
t2 = self.get_transport()
100
self.assertEquals(self.get_server().logs, [])
103
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
104
"""Test the SFTP transport with homedir based relative paths."""
106
def test__remote_path(self):
107
t = self.get_transport()
108
# try what is currently used:
109
# remote path = self._abspath(relpath)
110
self.assertEqual(self.test_dir + '/relative', t._remote_path('relative'))
111
# we dont os.path.join because windows gives us the wrong path
112
root_segments = self.test_dir.split('/')
113
root_parent = '/'.join(root_segments[:-1])
114
# .. should be honoured
115
self.assertEqual(root_parent + '/sibling', t._remote_path('../sibling'))
116
# / should be illegal ?
117
### FIXME decide and then test for all transports. RBC20051208
120
class SFTPTransportTestRelativeRoot(TestCaseWithSFTPServer):
121
"""Test the SFTP transport with homedir based relative paths."""
124
self._get_remote_is_absolute = False
125
super(SFTPTransportTestRelativeRoot, self).setUp()
127
def test__remote_path_relative_root(self):
128
# relative paths are preserved
129
t = self.get_transport('')
130
# the remote path should be ''
131
self.assertEqual('', t._path)
132
self.assertEqual('a', t._remote_path('a'))
135
151
class FakeSFTPTransport (object):
137
153
fake = FakeSFTPTransport()
140
class SFTPNonServerTest(TestCase):
143
if not paramiko_loaded:
144
raise TestSkipped('you must have paramiko to run this test')
156
class SFTPNonServerTest (unittest.TestCase):
146
157
def test_parse_url(self):
147
158
from bzrlib.transport.sftp import SFTPTransport
148
s = SFTPTransport('sftp://simple.example.com/home/source', clone_from=fake)
159
s = SFTPTransport('sftp://simple.example.com/%2fhome/source', clone_from=fake)
149
160
self.assertEquals(s._host, 'simple.example.com')
150
self.assertEquals(s._port, None)
161
self.assertEquals(s._port, 22)
151
162
self.assertEquals(s._path, '/home/source')
152
self.failUnless(s._password is None)
154
self.assertEquals(s.base, 'sftp://simple.example.com/home/source/')
156
s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/~/relative', clone_from=fake)
163
self.assert_(s._password is None)
165
s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
157
166
self.assertEquals(s._host, 'example.com')
158
167
self.assertEquals(s._port, 2222)
159
168
self.assertEquals(s._username, 'robey')
160
169
self.assertEquals(s._password, 'h@t')
161
170
self.assertEquals(s._path, 'relative')
163
# Base should not keep track of the password
164
self.assertEquals(s.base, 'sftp://robey@example.com:2222/~/relative/')
166
def test_relpath(self):
167
from bzrlib.transport.sftp import SFTPTransport
168
from bzrlib.errors import PathNotChild
170
s = SFTPTransport('sftp://user@host.com/abs/path', clone_from=fake)
171
self.assertEquals(s.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
172
# Can't test this one, because we actually get an AssertionError
173
# TODO: Consider raising an exception rather than an assert
174
#self.assertRaises(PathNotChild, s.relpath, 'http://user@host.com/abs/path/sub')
175
self.assertRaises(PathNotChild, s.relpath, 'sftp://user2@host.com/abs/path/sub')
176
self.assertRaises(PathNotChild, s.relpath, 'sftp://user@otherhost.com/abs/path/sub')
177
self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com:33/abs/path/sub')
178
self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com/~/rel/path/sub')
180
# Make sure it works when we don't supply a username
181
s = SFTPTransport('sftp://host.com/abs/path', clone_from=fake)
182
self.assertEquals(s.relpath('sftp://host.com/abs/path/sub'), 'sub')
184
# Make sure it works when parts of the path will be url encoded
185
# TODO: These may be incorrect, we might need to urllib.urlencode() before
186
# we pass the paths into the SFTPTransport constructor
187
s = SFTPTransport('sftp://host.com/dev/,path', clone_from=fake)
188
self.assertEquals(s.relpath('sftp://host.com/dev/,path/sub'), 'sub')
189
s = SFTPTransport('sftp://host.com/dev/%path', clone_from=fake)
190
self.assertEquals(s.relpath('sftp://host.com/dev/%path/sub'), 'sub')
192
def test_parse_invalid_url(self):
193
from bzrlib.transport.sftp import SFTPTransport, TransportError
195
s = SFTPTransport('sftp://lilypond.org:~janneke/public_html/bzr/gub',
197
self.fail('expected exception not raised')
198
except TransportError, e:
199
self.assertEquals(str(e),
201
'invalid port number ~janneke in url:\n'
202
'sftp://lilypond.org:~janneke/public_html/bzr/gub ')
204
def test_get_paramiko_vendor(self):
205
"""Test that if no 'ssh' is available we get builtin paramiko"""
206
from bzrlib.transport import ssh
207
# set '.' as the only location in the path, forcing no 'ssh' to exist
208
orig_vendor = ssh._ssh_vendor
209
orig_path = set_or_unset_env('PATH', '.')
211
# No vendor defined yet, query for one
212
ssh._ssh_vendor = None
213
vendor = ssh._get_ssh_vendor()
214
self.assertIsInstance(vendor, ssh.ParamikoVendor)
216
set_or_unset_env('PATH', orig_path)
217
ssh._ssh_vendor = orig_vendor
219
def test_abspath_root_sibling_server(self):
220
from bzrlib.transport.sftp import SFTPSiblingAbsoluteServer
221
server = SFTPSiblingAbsoluteServer()
224
transport = get_transport(server.get_url())
225
self.assertFalse(transport.abspath('/').endswith('/~/'))
226
self.assertTrue(transport.abspath('/').endswith('/'))
232
173
class SFTPBranchTest(TestCaseWithSFTPServer):
233
174
"""Test some stuff when accessing a bzr Branch over sftp"""
235
176
def test_lock_file(self):
236
# old format branches use a special lock file on sftp.
237
b = self.make_branch('', format=bzrdir.BzrDirFormat6())
238
b = bzrlib.branch.Branch.open(self.get_url())
177
"""Make sure that a Branch accessed over sftp tries to lock itself."""
178
from bzrlib.branch import Branch
181
b = Branch.initialize(self._sftp_url)
239
182
self.failUnlessExists('.bzr/')
240
183
self.failUnlessExists('.bzr/branch-format')
241
184
self.failUnlessExists('.bzr/branch-lock')
243
self.failIf(lexists('.bzr/branch-lock.write-lock'))
186
self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
245
188
self.failUnlessExists('.bzr/branch-lock.write-lock')
247
self.failIf(lexists('.bzr/branch-lock.write-lock'))
249
def test_push_support(self):
250
self.build_tree(['a/', 'a/foo'])
251
t = bzrdir.BzrDir.create_standalone_workingtree('a')
254
t.commit('foo', rev_id='a1')
256
b2 = bzrdir.BzrDir.create_branch_and_repo(self.get_url('/b'))
259
self.assertEquals(b2.revision_history(), ['a1'])
261
open('a/foo', 'wt').write('something new in foo\n')
262
t.commit('new', rev_id='a2')
265
self.assertEquals(b2.revision_history(), ['a1', 'a2'])
268
class SSHVendorConnection(TestCaseWithSFTPServer):
269
"""Test that the ssh vendors can all connect.
271
Verify that a full-handshake (SSH over loopback TCP) sftp connection works.
273
We have 3 sftp implementations in the test suite:
274
'loopback': Doesn't use ssh, just uses a local socket. Most tests are
275
done this way to save the handshaking time, so it is not
277
'none': This uses paramiko's built-in ssh client and server, and layers
279
None: If 'ssh' exists on the machine, then it will be spawned as a
284
super(SSHVendorConnection, self).setUp()
285
from bzrlib.transport.sftp import SFTPFullAbsoluteServer
288
"""Just a wrapper so that when created, it will set _vendor"""
289
# SFTPFullAbsoluteServer can handle any vendor,
290
# it just needs to be set between the time it is instantiated
291
# and the time .setUp() is called
292
server = SFTPFullAbsoluteServer()
293
server._vendor = self._test_vendor
295
self._test_vendor = 'loopback'
296
self.transport_server = create_server
297
f = open('a_file', 'wb')
303
def set_vendor(self, vendor):
304
self._test_vendor = vendor
306
def test_connection_paramiko(self):
307
from bzrlib.transport import ssh
308
self.set_vendor(ssh.ParamikoVendor())
309
t = self.get_transport()
310
self.assertEqual('foobar\n', t.get('a_file').read())
312
def test_connection_vendor(self):
313
raise TestSkipped("We don't test spawning real ssh,"
314
" because it prompts for a password."
315
" Enable this test if we figure out"
316
" how to prevent this.")
317
self.set_vendor(None)
318
t = self.get_transport()
319
self.assertEqual('foobar\n', t.get('a_file').read())
322
class SSHVendorBadConnection(TestCaseWithTransport):
323
"""Test that the ssh vendors handle bad connection properly
325
We don't subclass TestCaseWithSFTPServer, because we don't actually
326
need an SFTP connection.
330
if not paramiko_loaded:
331
raise TestSkipped('you must have paramiko to run this test')
332
super(SSHVendorBadConnection, self).setUp()
333
import bzrlib.transport.ssh
335
# open a random port, so we know nobody else is using it
336
# but don't actually listen on the port.
338
s.bind(('localhost', 0))
339
self.bogus_url = 'sftp://%s:%s/' % s.getsockname()
341
orig_vendor = bzrlib.transport.ssh._ssh_vendor
343
bzrlib.transport.ssh._ssh_vendor = orig_vendor
345
self.addCleanup(reset)
347
def set_vendor(self, vendor):
348
import bzrlib.transport.ssh
349
bzrlib.transport.ssh._ssh_vendor = vendor
351
def test_bad_connection_paramiko(self):
352
"""Test that a real connection attempt raises the right error"""
353
from bzrlib.transport import ssh
354
self.set_vendor(ssh.ParamikoVendor())
355
self.assertRaises(errors.ConnectionError,
356
bzrlib.transport.get_transport, self.bogus_url)
358
def test_bad_connection_ssh(self):
359
"""None => auto-detect vendor"""
360
self.set_vendor(None)
361
# This is how I would normally test the connection code
362
# it makes it very clear what we are testing.
363
# However, 'ssh' will create stipple on the output, so instead
364
# I'm using run_bzr_subprocess, and parsing the output
366
# t = bzrlib.transport.get_transport(self.bogus_url)
367
# except errors.ConnectionError:
370
# except errors.NameError, e:
371
# if 'SSHException' in str(e):
372
# raise TestSkipped('Known NameError bug in paramiko 1.6.1')
375
# self.fail('Excepted ConnectionError to be raised')
377
out, err = self.run_bzr_subprocess('log', self.bogus_url, retcode=3)
378
self.assertEqual('', out)
379
if "NameError: global name 'SSHException'" in err:
380
# We aren't fixing this bug, because it is a bug in
381
# paramiko, but we know about it, so we don't have to
383
raise TestSkipped('Known NameError bug with paramiko-1.6.1')
384
self.assertContainsRe(err, r'bzr: ERROR: Unable to connect to SSH host'
385
r' 127\.0\.0\.1:\d+; ')
388
class SFTPLatencyKnob(TestCaseWithSFTPServer):
389
"""Test that the testing SFTPServer's latency knob works."""
391
def test_latency_knob_slows_transport(self):
392
# change the latency knob to 500ms. We take about 40ms for a
393
# loopback connection ordinarily.
394
start_time = time.time()
395
self.get_server().add_latency = 0.5
396
transport = self.get_transport()
397
with_latency_knob_time = time.time() - start_time
398
self.assertTrue(with_latency_knob_time > 0.4)
400
def test_default(self):
401
# This test is potentially brittle: under extremely high machine load
402
# it could fail, but that is quite unlikely
403
start_time = time.time()
404
transport = self.get_transport()
405
regular_time = time.time() - start_time
406
self.assertTrue(regular_time < 0.5)
409
class FakeSocket(object):
410
"""Fake socket object used to test the SocketDelay wrapper without
417
def send(self, data, flags=0):
421
def sendall(self, data, flags=0):
425
def recv(self, size, flags=0):
426
if size < len(self._data):
427
result = self._data[:size]
428
self._data = self._data[size:]
436
class TestSocketDelay(TestCase):
440
if not paramiko_loaded:
441
raise TestSkipped('you must have paramiko to run this test')
443
def test_delay(self):
444
from bzrlib.transport.sftp import SocketDelay
445
sending = FakeSocket()
446
receiving = SocketDelay(sending, 0.1, bandwidth=1000000,
448
# check that simulated time is charged only per round-trip:
449
t1 = SocketDelay.simulated_time
450
receiving.send("connect1")
451
self.assertEqual(sending.recv(1024), "connect1")
452
t2 = SocketDelay.simulated_time
453
self.assertAlmostEqual(t2 - t1, 0.1)
454
receiving.send("connect2")
455
self.assertEqual(sending.recv(1024), "connect2")
456
sending.send("hello")
457
self.assertEqual(receiving.recv(1024), "hello")
458
t3 = SocketDelay.simulated_time
459
self.assertAlmostEqual(t3 - t2, 0.1)
460
sending.send("hello")
461
self.assertEqual(receiving.recv(1024), "hello")
462
sending.send("hello")
463
self.assertEqual(receiving.recv(1024), "hello")
464
sending.send("hello")
465
self.assertEqual(receiving.recv(1024), "hello")
466
t4 = SocketDelay.simulated_time
467
self.assertAlmostEqual(t4, t3)
469
def test_bandwidth(self):
470
from bzrlib.transport.sftp import SocketDelay
471
sending = FakeSocket()
472
receiving = SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
474
# check that simulated time is charged only per round-trip:
475
t1 = SocketDelay.simulated_time
476
receiving.send("connect")
477
self.assertEqual(sending.recv(1024), "connect")
478
sending.send("a" * 100)
479
self.assertEqual(receiving.recv(1024), "a" * 100)
480
t2 = SocketDelay.simulated_time
481
self.assertAlmostEqual(t2 - t1, 100 + 7)
190
self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
193
if not paramiko_loaded:
195
del SFTPTransportTest
196
del SFTPNonServerTest