~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp_transport.py

Merge updated set_parents api.

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
import os
18
18
import socket
19
19
import threading
 
20
import time
20
21
 
21
22
import bzrlib.bzrdir as bzrdir
22
23
import bzrlib.errors as errors
23
24
from bzrlib.osutils import pathjoin, lexists
24
25
from bzrlib.tests import TestCaseWithTransport, TestCase, TestSkipped
25
26
import bzrlib.transport
 
27
import bzrlib.transport.http
26
28
from bzrlib.workingtree import WorkingTree
27
29
 
28
30
try:
32
34
    paramiko_loaded = False
33
35
 
34
36
 
 
37
def set_test_transport_to_sftp(testcase):
 
38
    """A helper to set transports on test case instances."""
 
39
    from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
 
40
    if getattr(testcase, '_get_remote_is_absolute', None) is None:
 
41
        testcase._get_remote_is_absolute = True
 
42
    if testcase._get_remote_is_absolute:
 
43
        testcase.transport_server = SFTPAbsoluteServer
 
44
    else:
 
45
        testcase.transport_server = SFTPHomeDirServer
 
46
    testcase.transport_readonly_server = bzrlib.transport.http.HttpServer
 
47
 
 
48
 
35
49
class TestCaseWithSFTPServer(TestCaseWithTransport):
36
50
    """A test case base class that provides a sftp server on localhost."""
37
51
 
38
52
    def setUp(self):
39
 
        if not paramiko_loaded:
40
 
            raise TestSkipped('you must have paramiko to run this test')
41
53
        super(TestCaseWithSFTPServer, self).setUp()
42
 
        from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
43
 
        if getattr(self, '_get_remote_is_absolute', None) is None:
44
 
            self._get_remote_is_absolute = True
45
 
        if self._get_remote_is_absolute:
46
 
            self.transport_server = SFTPAbsoluteServer
47
 
        else:
48
 
            self.transport_server = SFTPHomeDirServer
49
 
        self.transport_readonly_server = bzrlib.transport.http.HttpServer
 
54
        if not paramiko_loaded:
 
55
            raise TestSkipped('you must have paramiko to run this test')
 
56
        set_test_transport_to_sftp(self) 
50
57
 
51
58
    def get_transport(self, path=None):
52
59
        """Return a transport relative to self._test_root."""
342
349
            raise TestSkipped('Known NameError bug with paramiko-1.6.1')
343
350
        self.assertContainsRe(err, 'Connection error')
344
351
 
 
352
 
 
353
class SFTPLatencyKnob(TestCaseWithSFTPServer):
 
354
    """Test that the testing SFTPServer's latency knob works."""
 
355
 
 
356
    def test_latency_knob_slows_transport(self):
 
357
        # change the latency knob to 500ms. We take about 40ms for a 
 
358
        # loopback connection ordinarily.
 
359
        start_time = time.time()
 
360
        self.get_server().add_latency = 0.5
 
361
        transport = self.get_transport()
 
362
        with_latency_knob_time = time.time() - start_time
 
363
        print with_latency_knob_time
 
364
        self.assertTrue(with_latency_knob_time > 0.4)
 
365
 
 
366
    def test_default(self):
 
367
        # This test is potentially brittle: under extremely high machine load
 
368
        # it could fail, but that is quite unlikely
 
369
        start_time = time.time()
 
370
        transport = self.get_transport()
 
371
        regular_time = time.time() - start_time
 
372
        self.assertTrue(regular_time < 0.5)
 
373
 
 
374
 
 
375
class FakeSocket(object):
 
376
    """Fake socket object used to test the SocketDelay wrapper without
 
377
    using a real socket.
 
378
    """
 
379
 
 
380
    def __init__(self):
 
381
        self._data = ""
 
382
 
 
383
    def send(self, data, flags=0):
 
384
        self._data += data
 
385
        return len(data)
 
386
 
 
387
    def sendall(self, data, flags=0):
 
388
        self._data += data
 
389
        return len(data)
 
390
 
 
391
    def recv(self, size, flags=0):
 
392
        if size < len(self._data):
 
393
            result = self._data[:size]
 
394
            self._data = self._data[size:]
 
395
            return result
 
396
        else:
 
397
            result = self._data
 
398
            self._data = ""
 
399
            return result
 
400
 
 
401
 
 
402
class TestSocketDelay(TestCase):
 
403
 
 
404
    def setUp(self):
 
405
        TestCase.setUp(self)
 
406
 
 
407
    def test_delay(self):
 
408
        from bzrlib.transport.sftp import SocketDelay
 
409
        sending = FakeSocket()
 
410
        receiving = SocketDelay(sending, 0.1, bandwidth=1000000,
 
411
                                really_sleep=False)
 
412
        # check that simulated time is charged only per round-trip:
 
413
        t1 = SocketDelay.simulated_time
 
414
        receiving.send("connect1")
 
415
        self.assertEqual(sending.recv(1024), "connect1")
 
416
        t2 = SocketDelay.simulated_time
 
417
        self.assertAlmostEqual(t2 - t1, 0.1)
 
418
        receiving.send("connect2")
 
419
        self.assertEqual(sending.recv(1024), "connect2")
 
420
        sending.send("hello")
 
421
        self.assertEqual(receiving.recv(1024), "hello")
 
422
        t3 = SocketDelay.simulated_time
 
423
        self.assertAlmostEqual(t3 - t2, 0.1)
 
424
        sending.send("hello")
 
425
        self.assertEqual(receiving.recv(1024), "hello")
 
426
        sending.send("hello")
 
427
        self.assertEqual(receiving.recv(1024), "hello")
 
428
        sending.send("hello")
 
429
        self.assertEqual(receiving.recv(1024), "hello")
 
430
        t4 = SocketDelay.simulated_time
 
431
        self.assertAlmostEqual(t4, t3)
 
432
 
 
433
    def test_bandwidth(self):
 
434
        from bzrlib.transport.sftp import SocketDelay
 
435
        sending = FakeSocket()
 
436
        receiving = SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
 
437
                                really_sleep=False)
 
438
        # check that simulated time is charged only per round-trip:
 
439
        t1 = SocketDelay.simulated_time
 
440
        receiving.send("connect")
 
441
        self.assertEqual(sending.recv(1024), "connect")
 
442
        sending.send("a" * 100)
 
443
        self.assertEqual(receiving.recv(1024), "a" * 100)
 
444
        t2 = SocketDelay.simulated_time
 
445
        self.assertAlmostEqual(t2 - t1, 100 + 7)
 
446
 
 
447