~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp.py

  • Committer: Robert Collins
  • Date: 2005-11-28 05:13:41 UTC
  • mfrom: (1185.33.54 merge-recovered)
  • Revision ID: robertc@robertcollins.net-20051128051341-059936f2f29a12c8
Merge from Martin. Adjust check to work with HTTP again.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
2
 
#
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17
17
import os
18
18
import socket
19
19
import threading
20
 
import time
 
20
import unittest
21
21
 
22
 
import bzrlib.bzrdir as bzrdir
23
 
import bzrlib.errors as errors
24
 
from bzrlib.osutils import pathjoin, lexists, set_or_unset_env
25
 
from bzrlib.tests import TestCaseWithTransport, TestCase, TestSkipped
26
 
from bzrlib.tests.HttpServer import HttpServer
27
 
import bzrlib.transport
28
 
from bzrlib.transport import get_transport
29
 
import bzrlib.transport.http
30
 
from bzrlib.workingtree import WorkingTree
 
22
from bzrlib.tests import TestCaseInTempDir
 
23
from bzrlib.tests.test_transport import TestTransportMixIn
31
24
 
32
25
try:
33
26
    import paramiko
 
27
    from stub_sftp import StubServer, StubSFTPServer
34
28
    paramiko_loaded = True
35
29
except ImportError:
36
30
    paramiko_loaded = False
37
31
 
38
32
 
39
 
def set_test_transport_to_sftp(testcase):
40
 
    """A helper to set transports on test case instances."""
41
 
    from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
42
 
    if getattr(testcase, '_get_remote_is_absolute', None) is None:
43
 
        testcase._get_remote_is_absolute = True
44
 
    if testcase._get_remote_is_absolute:
45
 
        testcase.transport_server = SFTPAbsoluteServer
46
 
    else:
47
 
        testcase.transport_server = SFTPHomeDirServer
48
 
    testcase.transport_readonly_server = HttpServer
49
 
 
50
 
 
51
 
class TestCaseWithSFTPServer(TestCaseWithTransport):
52
 
    """A test case base class that provides a sftp server on localhost."""
 
33
STUB_SERVER_KEY = """
 
34
-----BEGIN RSA PRIVATE KEY-----
 
35
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
 
36
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
 
37
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
 
38
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
 
39
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
 
40
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
 
41
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
 
42
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
 
43
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
 
44
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
 
45
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
 
46
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
 
47
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
 
48
-----END RSA PRIVATE KEY-----
 
49
"""
 
50
    
 
51
 
 
52
class SingleListener (threading.Thread):
 
53
    def __init__(self, callback):
 
54
        threading.Thread.__init__(self)
 
55
        self._callback = callback
 
56
        self._socket = socket.socket()
 
57
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
58
        self._socket.bind(('localhost', 0))
 
59
        self._socket.listen(1)
 
60
        self.port = self._socket.getsockname()[1]
 
61
        self.stop_event = threading.Event()
 
62
 
 
63
    def run(self):
 
64
        s, _ = self._socket.accept()
 
65
        # now close the listen socket
 
66
        self._socket.close()
 
67
        self._callback(s, self.stop_event)
 
68
    
 
69
    def stop(self):
 
70
        self.stop_event.set()
 
71
        
 
72
        
 
73
class TestCaseWithSFTPServer (TestCaseInTempDir):
 
74
    """
 
75
    Execute a test case with a stub SFTP server, serving files from the local
 
76
    filesystem over the loopback network.
 
77
    """
 
78
    
 
79
    def _run_server(self, s, stop_event):
 
80
        ssh_server = paramiko.Transport(s)
 
81
        key_file = os.path.join(self._root, 'test_rsa.key')
 
82
        file(key_file, 'w').write(STUB_SERVER_KEY)
 
83
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
84
        ssh_server.add_server_key(host_key)
 
85
        server = StubServer()
 
86
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
 
87
        event = threading.Event()
 
88
        ssh_server.start_server(event, server)
 
89
        event.wait(5.0)
 
90
        stop_event.wait(30.0)
53
91
 
54
92
    def setUp(self):
55
 
        super(TestCaseWithSFTPServer, self).setUp()
56
 
        if not paramiko_loaded:
57
 
            raise TestSkipped('you must have paramiko to run this test')
58
 
        set_test_transport_to_sftp(self) 
59
 
 
60
 
    def get_transport(self, path=None):
61
 
        """Return a transport relative to self._test_root."""
62
 
        return bzrlib.transport.get_transport(self.get_url(path))
63
 
 
64
 
 
65
 
class SFTPLockTests (TestCaseWithSFTPServer):
 
93
        TestCaseInTempDir.setUp(self)
 
94
        self._root = self.test_dir
 
95
 
 
96
    def delayed_setup(self):
 
97
        # some tests are just stubs that call setUp and then immediately call
 
98
        # tearDwon.  so don't create the port listener until get_transport is
 
99
        # called and we know we're in an actual test.
 
100
        self._listener = SingleListener(self._run_server)
 
101
        self._listener.setDaemon(True)
 
102
        self._listener.start()        
 
103
        self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
 
104
        
 
105
    def tearDown(self):
 
106
        try:
 
107
            self._listener.stop()
 
108
        except AttributeError:
 
109
            pass
 
110
        TestCaseInTempDir.tearDown(self)
 
111
 
 
112
        
 
113
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
 
114
    readonly = False
 
115
    setup = True
 
116
 
 
117
    def get_transport(self):
 
118
        if self.setup:
 
119
            self.delayed_setup()
 
120
            self.setup = False
 
121
        from bzrlib.transport.sftp import SFTPTransport
 
122
        url = self._sftp_url
 
123
        return SFTPTransport(url)
66
124
 
67
125
    def test_sftp_locks(self):
68
126
        from bzrlib.errors import LockError
76
134
        self.assertRaises(LockError, t.lock_write, 'bogus')
77
135
 
78
136
        l.unlock()
79
 
        self.failIf(lexists('bogus.write-lock'))
 
137
        self.failIf(os.path.lexists('bogus.write-lock'))
80
138
 
81
139
        open('something.write-lock', 'wb').write('fake lock\n')
82
140
        self.assertRaises(LockError, t.lock_write, 'something')
89
147
        l.unlock()
90
148
        l2.unlock()
91
149
 
92
 
    def test_multiple_connections(self):
93
 
        t = self.get_transport()
94
 
        self.assertTrue('sftpserver - new connection' in self.get_server().logs)
95
 
        self.get_server().logs = []
96
 
        # The second request should reuse the first connection
97
 
        # SingleListener only allows for a single connection,
98
 
        # So the next line fails unless the connection is reused
99
 
        t2 = self.get_transport()
100
 
        self.assertEquals(self.get_server().logs, [])
101
 
 
102
 
 
103
 
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
104
 
    """Test the SFTP transport with homedir based relative paths."""
105
 
 
106
 
    def test__remote_path(self):
107
 
        t = self.get_transport()
108
 
        # try what is currently used:
109
 
        # remote path = self._abspath(relpath)
110
 
        self.assertEqual(self.test_dir + '/relative', t._remote_path('relative'))
111
 
        # we dont os.path.join because windows gives us the wrong path
112
 
        root_segments = self.test_dir.split('/')
113
 
        root_parent = '/'.join(root_segments[:-1])
114
 
        # .. should be honoured
115
 
        self.assertEqual(root_parent + '/sibling', t._remote_path('../sibling'))
116
 
        # /  should be illegal ?
117
 
        ### FIXME decide and then test for all transports. RBC20051208
118
 
 
119
 
 
120
 
class SFTPTransportTestRelativeRoot(TestCaseWithSFTPServer):
121
 
    """Test the SFTP transport with homedir based relative paths."""
122
 
 
123
 
    def setUp(self):
124
 
        self._get_remote_is_absolute = False
125
 
        super(SFTPTransportTestRelativeRoot, self).setUp()
126
 
 
127
 
    def test__remote_path_relative_root(self):
128
 
        # relative paths are preserved
129
 
        t = self.get_transport('')
130
 
        # the remote path should be ''
131
 
        self.assertEqual('', t._path)
132
 
        self.assertEqual('a', t._remote_path('a'))
133
 
 
134
150
 
135
151
class FakeSFTPTransport (object):
136
152
    _sftp = object()
137
153
fake = FakeSFTPTransport()
138
154
 
139
155
 
140
 
class SFTPNonServerTest(TestCase):
141
 
    def setUp(self):
142
 
        TestCase.setUp(self)
143
 
        if not paramiko_loaded:
144
 
            raise TestSkipped('you must have paramiko to run this test')
145
 
 
 
156
class SFTPNonServerTest (unittest.TestCase):
146
157
    def test_parse_url(self):
147
158
        from bzrlib.transport.sftp import SFTPTransport
148
 
        s = SFTPTransport('sftp://simple.example.com/home/source', clone_from=fake)
 
159
        s = SFTPTransport('sftp://simple.example.com/%2fhome/source', clone_from=fake)
149
160
        self.assertEquals(s._host, 'simple.example.com')
150
 
        self.assertEquals(s._port, None)
 
161
        self.assertEquals(s._port, 22)
151
162
        self.assertEquals(s._path, '/home/source')
152
 
        self.failUnless(s._password is None)
153
 
 
154
 
        self.assertEquals(s.base, 'sftp://simple.example.com/home/source/')
155
 
 
156
 
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/~/relative', clone_from=fake)
 
163
        self.assert_(s._password is None)
 
164
        
 
165
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
157
166
        self.assertEquals(s._host, 'example.com')
158
167
        self.assertEquals(s._port, 2222)
159
168
        self.assertEquals(s._username, 'robey')
160
169
        self.assertEquals(s._password, 'h@t')
161
170
        self.assertEquals(s._path, 'relative')
162
 
 
163
 
        # Base should not keep track of the password
164
 
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/~/relative/')
165
 
 
166
 
    def test_relpath(self):
167
 
        from bzrlib.transport.sftp import SFTPTransport
168
 
        from bzrlib.errors import PathNotChild
169
 
 
170
 
        s = SFTPTransport('sftp://user@host.com/abs/path', clone_from=fake)
171
 
        self.assertEquals(s.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
172
 
        # Can't test this one, because we actually get an AssertionError
173
 
        # TODO: Consider raising an exception rather than an assert
174
 
        #self.assertRaises(PathNotChild, s.relpath, 'http://user@host.com/abs/path/sub')
175
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user2@host.com/abs/path/sub')
176
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@otherhost.com/abs/path/sub')
177
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com:33/abs/path/sub')
178
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com/~/rel/path/sub')
179
 
 
180
 
        # Make sure it works when we don't supply a username
181
 
        s = SFTPTransport('sftp://host.com/abs/path', clone_from=fake)
182
 
        self.assertEquals(s.relpath('sftp://host.com/abs/path/sub'), 'sub')
183
 
 
184
 
        # Make sure it works when parts of the path will be url encoded
185
 
        # TODO: These may be incorrect, we might need to urllib.urlencode() before
186
 
        # we pass the paths into the SFTPTransport constructor
187
 
        s = SFTPTransport('sftp://host.com/dev/,path', clone_from=fake)
188
 
        self.assertEquals(s.relpath('sftp://host.com/dev/,path/sub'), 'sub')
189
 
        s = SFTPTransport('sftp://host.com/dev/%path', clone_from=fake)
190
 
        self.assertEquals(s.relpath('sftp://host.com/dev/%path/sub'), 'sub')
191
 
 
192
 
    def test_parse_invalid_url(self):
193
 
        from bzrlib.transport.sftp import SFTPTransport, TransportError
194
 
        try:
195
 
            s = SFTPTransport('sftp://lilypond.org:~janneke/public_html/bzr/gub',
196
 
                              clone_from=fake)
197
 
            self.fail('expected exception not raised')
198
 
        except TransportError, e:
199
 
            self.assertEquals(str(e),
200
 
                    'Transport error: '
201
 
                    'invalid port number ~janneke in url:\n'
202
 
                    'sftp://lilypond.org:~janneke/public_html/bzr/gub ')
203
 
 
204
 
    def test_get_paramiko_vendor(self):
205
 
        """Test that if no 'ssh' is available we get builtin paramiko"""
206
 
        from bzrlib.transport import ssh
207
 
        # set '.' as the only location in the path, forcing no 'ssh' to exist
208
 
        orig_vendor = ssh._ssh_vendor
209
 
        orig_path = set_or_unset_env('PATH', '.')
210
 
        try:
211
 
            # No vendor defined yet, query for one
212
 
            ssh._ssh_vendor = None
213
 
            vendor = ssh._get_ssh_vendor()
214
 
            self.assertIsInstance(vendor, ssh.ParamikoVendor)
215
 
        finally:
216
 
            set_or_unset_env('PATH', orig_path)
217
 
            ssh._ssh_vendor = orig_vendor
218
 
 
219
 
    def test_abspath_root_sibling_server(self):
220
 
        from bzrlib.transport.sftp import SFTPSiblingAbsoluteServer
221
 
        server = SFTPSiblingAbsoluteServer()
222
 
        server.setUp()
223
 
        try:
224
 
            transport = get_transport(server.get_url())
225
 
            self.assertFalse(transport.abspath('/').endswith('/~/'))
226
 
            self.assertTrue(transport.abspath('/').endswith('/'))
227
 
            del transport
228
 
        finally:
229
 
            server.tearDown()
230
 
 
 
171
        
231
172
 
232
173
class SFTPBranchTest(TestCaseWithSFTPServer):
233
174
    """Test some stuff when accessing a bzr Branch over sftp"""
234
175
 
235
176
    def test_lock_file(self):
236
 
        # old format branches use a special lock file on sftp.
237
 
        b = self.make_branch('', format=bzrdir.BzrDirFormat6())
238
 
        b = bzrlib.branch.Branch.open(self.get_url())
 
177
        """Make sure that a Branch accessed over sftp tries to lock itself."""
 
178
        from bzrlib.branch import Branch
 
179
 
 
180
        self.delayed_setup()
 
181
        b = Branch.initialize(self._sftp_url)
239
182
        self.failUnlessExists('.bzr/')
240
183
        self.failUnlessExists('.bzr/branch-format')
241
184
        self.failUnlessExists('.bzr/branch-lock')
242
185
 
243
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
 
186
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
244
187
        b.lock_write()
245
188
        self.failUnlessExists('.bzr/branch-lock.write-lock')
246
189
        b.unlock()
247
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
248
 
 
249
 
    def test_push_support(self):
250
 
        self.build_tree(['a/', 'a/foo'])
251
 
        t = bzrdir.BzrDir.create_standalone_workingtree('a')
252
 
        b = t.branch
253
 
        t.add('foo')
254
 
        t.commit('foo', rev_id='a1')
255
 
 
256
 
        b2 = bzrdir.BzrDir.create_branch_and_repo(self.get_url('/b'))
257
 
        b2.pull(b)
258
 
 
259
 
        self.assertEquals(b2.revision_history(), ['a1'])
260
 
 
261
 
        open('a/foo', 'wt').write('something new in foo\n')
262
 
        t.commit('new', rev_id='a2')
263
 
        b2.pull(b)
264
 
 
265
 
        self.assertEquals(b2.revision_history(), ['a1', 'a2'])
266
 
 
267
 
 
268
 
class SSHVendorConnection(TestCaseWithSFTPServer):
269
 
    """Test that the ssh vendors can all connect.
270
 
 
271
 
    Verify that a full-handshake (SSH over loopback TCP) sftp connection works.
272
 
 
273
 
    We have 3 sftp implementations in the test suite:
274
 
      'loopback': Doesn't use ssh, just uses a local socket. Most tests are
275
 
                  done this way to save the handshaking time, so it is not
276
 
                  tested again here
277
 
      'none':     This uses paramiko's built-in ssh client and server, and layers
278
 
                  sftp on top of it.
279
 
      None:       If 'ssh' exists on the machine, then it will be spawned as a
280
 
                  child process.
281
 
    """
282
 
    
283
 
    def setUp(self):
284
 
        super(SSHVendorConnection, self).setUp()
285
 
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
286
 
 
287
 
        def create_server():
288
 
            """Just a wrapper so that when created, it will set _vendor"""
289
 
            # SFTPFullAbsoluteServer can handle any vendor,
290
 
            # it just needs to be set between the time it is instantiated
291
 
            # and the time .setUp() is called
292
 
            server = SFTPFullAbsoluteServer()
293
 
            server._vendor = self._test_vendor
294
 
            return server
295
 
        self._test_vendor = 'loopback'
296
 
        self.transport_server = create_server
297
 
        f = open('a_file', 'wb')
298
 
        try:
299
 
            f.write('foobar\n')
300
 
        finally:
301
 
            f.close()
302
 
 
303
 
    def set_vendor(self, vendor):
304
 
        self._test_vendor = vendor
305
 
 
306
 
    def test_connection_paramiko(self):
307
 
        from bzrlib.transport import ssh
308
 
        self.set_vendor(ssh.ParamikoVendor())
309
 
        t = self.get_transport()
310
 
        self.assertEqual('foobar\n', t.get('a_file').read())
311
 
 
312
 
    def test_connection_vendor(self):
313
 
        raise TestSkipped("We don't test spawning real ssh,"
314
 
                          " because it prompts for a password."
315
 
                          " Enable this test if we figure out"
316
 
                          " how to prevent this.")
317
 
        self.set_vendor(None)
318
 
        t = self.get_transport()
319
 
        self.assertEqual('foobar\n', t.get('a_file').read())
320
 
 
321
 
 
322
 
class SSHVendorBadConnection(TestCaseWithTransport):
323
 
    """Test that the ssh vendors handle bad connection properly
324
 
 
325
 
    We don't subclass TestCaseWithSFTPServer, because we don't actually
326
 
    need an SFTP connection.
327
 
    """
328
 
 
329
 
    def setUp(self):
330
 
        if not paramiko_loaded:
331
 
            raise TestSkipped('you must have paramiko to run this test')
332
 
        super(SSHVendorBadConnection, self).setUp()
333
 
        import bzrlib.transport.ssh
334
 
 
335
 
        # open a random port, so we know nobody else is using it
336
 
        # but don't actually listen on the port.
337
 
        s = socket.socket()
338
 
        s.bind(('localhost', 0))
339
 
        self.bogus_url = 'sftp://%s:%s/' % s.getsockname()
340
 
 
341
 
        orig_vendor = bzrlib.transport.ssh._ssh_vendor
342
 
        def reset():
343
 
            bzrlib.transport.ssh._ssh_vendor = orig_vendor
344
 
            s.close()
345
 
        self.addCleanup(reset)
346
 
 
347
 
    def set_vendor(self, vendor):
348
 
        import bzrlib.transport.ssh
349
 
        bzrlib.transport.ssh._ssh_vendor = vendor
350
 
 
351
 
    def test_bad_connection_paramiko(self):
352
 
        """Test that a real connection attempt raises the right error"""
353
 
        from bzrlib.transport import ssh
354
 
        self.set_vendor(ssh.ParamikoVendor())
355
 
        self.assertRaises(errors.ConnectionError,
356
 
                          bzrlib.transport.get_transport, self.bogus_url)
357
 
 
358
 
    def test_bad_connection_ssh(self):
359
 
        """None => auto-detect vendor"""
360
 
        self.set_vendor(None)
361
 
        # This is how I would normally test the connection code
362
 
        # it makes it very clear what we are testing.
363
 
        # However, 'ssh' will create stipple on the output, so instead
364
 
        # I'm using run_bzr_subprocess, and parsing the output
365
 
        # try:
366
 
        #     t = bzrlib.transport.get_transport(self.bogus_url)
367
 
        # except errors.ConnectionError:
368
 
        #     # Correct error
369
 
        #     pass
370
 
        # except errors.NameError, e:
371
 
        #     if 'SSHException' in str(e):
372
 
        #         raise TestSkipped('Known NameError bug in paramiko 1.6.1')
373
 
        #     raise
374
 
        # else:
375
 
        #     self.fail('Excepted ConnectionError to be raised')
376
 
 
377
 
        out, err = self.run_bzr_subprocess('log', self.bogus_url, retcode=3)
378
 
        self.assertEqual('', out)
379
 
        if "NameError: global name 'SSHException'" in err:
380
 
            # We aren't fixing this bug, because it is a bug in
381
 
            # paramiko, but we know about it, so we don't have to
382
 
            # fail the test
383
 
            raise TestSkipped('Known NameError bug with paramiko-1.6.1')
384
 
        self.assertContainsRe(err, r'bzr: ERROR: Unable to connect to SSH host'
385
 
                                   r' 127\.0\.0\.1:\d+; ')
386
 
 
387
 
 
388
 
class SFTPLatencyKnob(TestCaseWithSFTPServer):
389
 
    """Test that the testing SFTPServer's latency knob works."""
390
 
 
391
 
    def test_latency_knob_slows_transport(self):
392
 
        # change the latency knob to 500ms. We take about 40ms for a 
393
 
        # loopback connection ordinarily.
394
 
        start_time = time.time()
395
 
        self.get_server().add_latency = 0.5
396
 
        transport = self.get_transport()
397
 
        with_latency_knob_time = time.time() - start_time
398
 
        self.assertTrue(with_latency_knob_time > 0.4)
399
 
 
400
 
    def test_default(self):
401
 
        # This test is potentially brittle: under extremely high machine load
402
 
        # it could fail, but that is quite unlikely
403
 
        start_time = time.time()
404
 
        transport = self.get_transport()
405
 
        regular_time = time.time() - start_time
406
 
        self.assertTrue(regular_time < 0.5)
407
 
 
408
 
 
409
 
class FakeSocket(object):
410
 
    """Fake socket object used to test the SocketDelay wrapper without
411
 
    using a real socket.
412
 
    """
413
 
 
414
 
    def __init__(self):
415
 
        self._data = ""
416
 
 
417
 
    def send(self, data, flags=0):
418
 
        self._data += data
419
 
        return len(data)
420
 
 
421
 
    def sendall(self, data, flags=0):
422
 
        self._data += data
423
 
        return len(data)
424
 
 
425
 
    def recv(self, size, flags=0):
426
 
        if size < len(self._data):
427
 
            result = self._data[:size]
428
 
            self._data = self._data[size:]
429
 
            return result
430
 
        else:
431
 
            result = self._data
432
 
            self._data = ""
433
 
            return result
434
 
 
435
 
 
436
 
class TestSocketDelay(TestCase):
437
 
 
438
 
    def setUp(self):
439
 
        TestCase.setUp(self)
440
 
        if not paramiko_loaded:
441
 
            raise TestSkipped('you must have paramiko to run this test')
442
 
 
443
 
    def test_delay(self):
444
 
        from bzrlib.transport.sftp import SocketDelay
445
 
        sending = FakeSocket()
446
 
        receiving = SocketDelay(sending, 0.1, bandwidth=1000000,
447
 
                                really_sleep=False)
448
 
        # check that simulated time is charged only per round-trip:
449
 
        t1 = SocketDelay.simulated_time
450
 
        receiving.send("connect1")
451
 
        self.assertEqual(sending.recv(1024), "connect1")
452
 
        t2 = SocketDelay.simulated_time
453
 
        self.assertAlmostEqual(t2 - t1, 0.1)
454
 
        receiving.send("connect2")
455
 
        self.assertEqual(sending.recv(1024), "connect2")
456
 
        sending.send("hello")
457
 
        self.assertEqual(receiving.recv(1024), "hello")
458
 
        t3 = SocketDelay.simulated_time
459
 
        self.assertAlmostEqual(t3 - t2, 0.1)
460
 
        sending.send("hello")
461
 
        self.assertEqual(receiving.recv(1024), "hello")
462
 
        sending.send("hello")
463
 
        self.assertEqual(receiving.recv(1024), "hello")
464
 
        sending.send("hello")
465
 
        self.assertEqual(receiving.recv(1024), "hello")
466
 
        t4 = SocketDelay.simulated_time
467
 
        self.assertAlmostEqual(t4, t3)
468
 
 
469
 
    def test_bandwidth(self):
470
 
        from bzrlib.transport.sftp import SocketDelay
471
 
        sending = FakeSocket()
472
 
        receiving = SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
473
 
                                really_sleep=False)
474
 
        # check that simulated time is charged only per round-trip:
475
 
        t1 = SocketDelay.simulated_time
476
 
        receiving.send("connect")
477
 
        self.assertEqual(sending.recv(1024), "connect")
478
 
        sending.send("a" * 100)
479
 
        self.assertEqual(receiving.recv(1024), "a" * 100)
480
 
        t2 = SocketDelay.simulated_time
481
 
        self.assertAlmostEqual(t2 - t1, 100 + 7)
482
 
 
483
 
 
 
190
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
 
191
 
 
192
 
 
193
if not paramiko_loaded:
 
194
    # TODO: Skip these
 
195
    del SFTPTransportTest
 
196
    del SFTPNonServerTest
 
197
    del SFTPBranchTest