32
34
paramiko_loaded = False
37
def set_test_transport_to_sftp(testcase):
38
"""A helper to set transports on test case instances."""
39
from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
40
if getattr(testcase, '_get_remote_is_absolute', None) is None:
41
testcase._get_remote_is_absolute = True
42
if testcase._get_remote_is_absolute:
43
testcase.transport_server = SFTPAbsoluteServer
45
testcase.transport_server = SFTPHomeDirServer
46
testcase.transport_readonly_server = bzrlib.transport.http.HttpServer
35
49
class TestCaseWithSFTPServer(TestCaseWithTransport):
36
50
"""A test case base class that provides a sftp server on localhost."""
39
if not paramiko_loaded:
40
raise TestSkipped('you must have paramiko to run this test')
41
53
super(TestCaseWithSFTPServer, self).setUp()
42
from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
43
if getattr(self, '_get_remote_is_absolute', None) is None:
44
self._get_remote_is_absolute = True
45
if self._get_remote_is_absolute:
46
self.transport_server = SFTPAbsoluteServer
48
self.transport_server = SFTPHomeDirServer
49
self.transport_readonly_server = bzrlib.transport.http.HttpServer
54
if not paramiko_loaded:
55
raise TestSkipped('you must have paramiko to run this test')
56
set_test_transport_to_sftp(self)
51
58
def get_transport(self, path=None):
52
59
"""Return a transport relative to self._test_root."""
342
349
raise TestSkipped('Known NameError bug with paramiko-1.6.1')
343
350
self.assertContainsRe(err, 'Connection error')
353
class SFTPLatencyKnob(TestCaseWithSFTPServer):
354
"""Test that the testing SFTPServer's latency knob works."""
356
def test_latency_knob_slows_transport(self):
357
# change the latency knob to 500ms. We take about 40ms for a
358
# loopback connection ordinarily.
359
start_time = time.time()
360
self.get_server().add_latency = 0.5
361
transport = self.get_transport()
362
with_latency_knob_time = time.time() - start_time
363
print with_latency_knob_time
364
self.assertTrue(with_latency_knob_time > 0.4)
366
def test_default(self):
367
# This test is potentially brittle: under extremely high machine load
368
# it could fail, but that is quite unlikely
369
start_time = time.time()
370
transport = self.get_transport()
371
regular_time = time.time() - start_time
372
self.assertTrue(regular_time < 0.5)
375
class FakeSocket(object):
376
"""Fake socket object used to test the SocketDelay wrapper without
383
def send(self, data, flags=0):
387
def sendall(self, data, flags=0):
391
def recv(self, size, flags=0):
392
if size < len(self._data):
393
result = self._data[:size]
394
self._data = self._data[size:]
402
class TestSocketDelay(TestCase):
407
def test_delay(self):
408
from bzrlib.transport.sftp import SocketDelay
409
sending = FakeSocket()
410
receiving = SocketDelay(sending, 0.1, bandwidth=1000000,
412
# check that simulated time is charged only per round-trip:
413
t1 = SocketDelay.simulated_time
414
receiving.send("connect1")
415
self.assertEqual(sending.recv(1024), "connect1")
416
t2 = SocketDelay.simulated_time
417
self.assertAlmostEqual(t2 - t1, 0.1)
418
receiving.send("connect2")
419
self.assertEqual(sending.recv(1024), "connect2")
420
sending.send("hello")
421
self.assertEqual(receiving.recv(1024), "hello")
422
t3 = SocketDelay.simulated_time
423
self.assertAlmostEqual(t3 - t2, 0.1)
424
sending.send("hello")
425
self.assertEqual(receiving.recv(1024), "hello")
426
sending.send("hello")
427
self.assertEqual(receiving.recv(1024), "hello")
428
sending.send("hello")
429
self.assertEqual(receiving.recv(1024), "hello")
430
t4 = SocketDelay.simulated_time
431
self.assertAlmostEqual(t4, t3)
433
def test_bandwidth(self):
434
from bzrlib.transport.sftp import SocketDelay
435
sending = FakeSocket()
436
receiving = SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
438
# check that simulated time is charged only per round-trip:
439
t1 = SocketDelay.simulated_time
440
receiving.send("connect")
441
self.assertEqual(sending.recv(1024), "connect")
442
sending.send("a" * 100)
443
self.assertEqual(receiving.recv(1024), "a" * 100)
444
t2 = SocketDelay.simulated_time
445
self.assertAlmostEqual(t2 - t1, 100 + 7)