~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp.py

doc

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Robey Pointer <robey@lag.net>
2
 
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
 
#
 
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
 
2
 
4
3
# This program is free software; you can redistribute it and/or modify
5
4
# it under the terms of the GNU General Public License as published by
6
5
# the Free Software Foundation; either version 2 of the License, or
7
6
# (at your option) any later version.
8
 
#
 
7
 
9
8
# This program is distributed in the hope that it will be useful,
10
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
11
# GNU General Public License for more details.
13
 
#
 
12
 
14
13
# You should have received a copy of the GNU General Public License
15
14
# along with this program; if not, write to the Free Software
16
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17
16
 
18
17
import os
19
18
import socket
20
 
import sys
21
19
import threading
22
 
import time
23
 
 
24
 
from bzrlib import (
25
 
    bzrdir,
26
 
    config,
27
 
    errors,
28
 
    tests,
29
 
    transport as _mod_transport,
30
 
    ui,
31
 
    )
32
 
from bzrlib.osutils import (
33
 
    pathjoin,
34
 
    lexists,
35
 
    set_or_unset_env,
36
 
    )
37
 
from bzrlib.tests import (
38
 
    features,
39
 
    TestCaseWithTransport,
40
 
    TestCase,
41
 
    TestSkipped,
42
 
    )
43
 
from bzrlib.tests.http_server import HttpServer
44
 
from bzrlib.transport import get_transport
45
 
import bzrlib.transport.http
46
 
 
47
 
if features.paramiko.available():
48
 
    from bzrlib.transport import sftp as _mod_sftp
49
 
    from bzrlib.tests import stub_sftp
50
 
 
51
 
from bzrlib.workingtree import WorkingTree
52
 
 
53
 
 
54
 
def set_test_transport_to_sftp(testcase):
55
 
    """A helper to set transports on test case instances."""
56
 
    if getattr(testcase, '_get_remote_is_absolute', None) is None:
57
 
        testcase._get_remote_is_absolute = True
58
 
    if testcase._get_remote_is_absolute:
59
 
        testcase.transport_server = stub_sftp.SFTPAbsoluteServer
60
 
    else:
61
 
        testcase.transport_server = stub_sftp.SFTPHomeDirServer
62
 
    testcase.transport_readonly_server = HttpServer
63
 
 
64
 
 
65
 
class TestCaseWithSFTPServer(TestCaseWithTransport):
66
 
    """A test case base class that provides a sftp server on localhost."""
 
20
import unittest
 
21
 
 
22
from bzrlib.tests import TestCaseInTempDir
 
23
from bzrlib.tests.test_transport import TestTransportMixIn
 
24
 
 
25
try:
 
26
    import paramiko
 
27
    from stub_sftp import StubServer, StubSFTPServer
 
28
    paramiko_loaded = True
 
29
except ImportError:
 
30
    paramiko_loaded = False
 
31
 
 
32
 
 
33
STUB_SERVER_KEY = """
 
34
-----BEGIN RSA PRIVATE KEY-----
 
35
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
 
36
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
 
37
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
 
38
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
 
39
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
 
40
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
 
41
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
 
42
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
 
43
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
 
44
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
 
45
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
 
46
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
 
47
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
 
48
-----END RSA PRIVATE KEY-----
 
49
"""
 
50
    
 
51
 
 
52
class SingleListener (threading.Thread):
 
53
    def __init__(self, callback):
 
54
        threading.Thread.__init__(self)
 
55
        self._callback = callback
 
56
        self._socket = socket.socket()
 
57
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
58
        self._socket.bind(('localhost', 0))
 
59
        self._socket.listen(1)
 
60
        self.port = self._socket.getsockname()[1]
 
61
        self.stop_event = threading.Event()
 
62
 
 
63
    def run(self):
 
64
        s, _ = self._socket.accept()
 
65
        # now close the listen socket
 
66
        self._socket.close()
 
67
        self._callback(s, self.stop_event)
 
68
    
 
69
    def stop(self):
 
70
        self.stop_event.set()
 
71
        
 
72
        
 
73
class TestCaseWithSFTPServer (TestCaseInTempDir):
 
74
    """
 
75
    Execute a test case with a stub SFTP server, serving files from the local
 
76
    filesystem over the loopback network.
 
77
    """
 
78
    
 
79
    def _run_server(self, s, stop_event):
 
80
        ssh_server = paramiko.Transport(s)
 
81
        key_file = os.path.join(self._root, 'test_rsa.key')
 
82
        file(key_file, 'w').write(STUB_SERVER_KEY)
 
83
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
84
        ssh_server.add_server_key(host_key)
 
85
        server = StubServer()
 
86
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
 
87
        event = threading.Event()
 
88
        ssh_server.start_server(event, server)
 
89
        event.wait(5.0)
 
90
        stop_event.wait(30.0)
67
91
 
68
92
    def setUp(self):
69
 
        super(TestCaseWithSFTPServer, self).setUp()
70
 
        self.requireFeature(features.paramiko)
71
 
        set_test_transport_to_sftp(self)
72
 
 
73
 
 
74
 
class SFTPLockTests(TestCaseWithSFTPServer):
 
93
        TestCaseInTempDir.setUp(self)
 
94
        self._root = self.test_dir
 
95
 
 
96
    def delayed_setup(self):
 
97
        # some tests are just stubs that call setUp and then immediately call
 
98
        # tearDwon.  so don't create the port listener until get_transport is
 
99
        # called and we know we're in an actual test.
 
100
        self._listener = SingleListener(self._run_server)
 
101
        self._listener.setDaemon(True)
 
102
        self._listener.start()        
 
103
        self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
 
104
        
 
105
    def tearDown(self):
 
106
        try:
 
107
            self._listener.stop()
 
108
        except AttributeError:
 
109
            pass
 
110
        TestCaseInTempDir.tearDown(self)
 
111
 
 
112
        
 
113
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
 
114
    readonly = False
 
115
    setup = True
 
116
 
 
117
    def get_transport(self):
 
118
        if self.setup:
 
119
            self.delayed_setup()
 
120
            self.setup = False
 
121
        from bzrlib.transport.sftp import SFTPTransport
 
122
        url = self._sftp_url
 
123
        return SFTPTransport(url)
75
124
 
76
125
    def test_sftp_locks(self):
77
126
        from bzrlib.errors import LockError
85
134
        self.assertRaises(LockError, t.lock_write, 'bogus')
86
135
 
87
136
        l.unlock()
88
 
        self.failIf(lexists('bogus.write-lock'))
 
137
        self.failIf(os.path.lexists('bogus.write-lock'))
89
138
 
90
139
        open('something.write-lock', 'wb').write('fake lock\n')
91
140
        self.assertRaises(LockError, t.lock_write, 'something')
99
148
        l2.unlock()
100
149
 
101
150
 
102
 
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
103
 
    """Test the SFTP transport with homedir based relative paths."""
104
 
 
105
 
    def test__remote_path(self):
106
 
        if sys.platform == 'darwin':
107
 
            # This test is about sftp absolute path handling. There is already
108
 
            # (in this test) a TODO about windows needing an absolute path
109
 
            # without drive letter. To me, using self.test_dir is a trick to
110
 
            # get an absolute path for comparison purposes.  That fails for OSX
111
 
            # because the sftp server doesn't resolve the links (and it doesn't
112
 
            # have to). --vila 20070924
113
 
            self.knownFailure('Mac OSX symlinks /tmp to /private/tmp,'
114
 
                              ' testing against self.test_dir'
115
 
                              ' is not appropriate')
116
 
        t = self.get_transport()
117
 
        # This test require unix-like absolute path
118
 
        test_dir = self.test_dir
119
 
        if sys.platform == 'win32':
120
 
            # using hack suggested by John Meinel.
121
 
            # TODO: write another mock server for this test
122
 
            #       and use absolute path without drive letter
123
 
            test_dir = '/' + test_dir
124
 
        # try what is currently used:
125
 
        # remote path = self._abspath(relpath)
126
 
        self.assertIsSameRealPath(test_dir + '/relative',
127
 
                                  t._remote_path('relative'))
128
 
        # we dont os.path.join because windows gives us the wrong path
129
 
        root_segments = test_dir.split('/')
130
 
        root_parent = '/'.join(root_segments[:-1])
131
 
        # .. should be honoured
132
 
        self.assertIsSameRealPath(root_parent + '/sibling',
133
 
                                  t._remote_path('../sibling'))
134
 
        # /  should be illegal ?
135
 
        ### FIXME decide and then test for all transports. RBC20051208
136
 
 
137
 
 
138
 
class SFTPTransportTestRelativeRoot(TestCaseWithSFTPServer):
139
 
    """Test the SFTP transport with homedir based relative paths."""
140
 
 
141
 
    def setUp(self):
142
 
        # Only SFTPHomeDirServer is tested here
143
 
        self._get_remote_is_absolute = False
144
 
        super(SFTPTransportTestRelativeRoot, self).setUp()
145
 
 
146
 
    def test__remote_path_relative_root(self):
147
 
        # relative paths are preserved
148
 
        t = self.get_transport('')
149
 
        self.assertEqual('/~/', t._path)
150
 
        # the remote path should be relative to home dir
151
 
        # (i.e. not begining with a '/')
152
 
        self.assertEqual('a', t._remote_path('a'))
153
 
 
154
 
 
155
 
class SFTPNonServerTest(TestCase):
156
 
    def setUp(self):
157
 
        TestCase.setUp(self)
158
 
        self.requireFeature(features.paramiko)
159
 
 
160
 
    def test_parse_url_with_home_dir(self):
161
 
        s = _mod_sftp.SFTPTransport(
162
 
            'sftp://ro%62ey:h%40t@example.com:2222/~/relative')
 
151
class FakeSFTPTransport (object):
 
152
    _sftp = object()
 
153
fake = FakeSFTPTransport()
 
154
 
 
155
 
 
156
class SFTPNonServerTest(unittest.TestCase):
 
157
    def test_parse_url(self):
 
158
        from bzrlib.transport.sftp import SFTPTransport
 
159
        s = SFTPTransport('sftp://simple.example.com/%2fhome/source', clone_from=fake)
 
160
        self.assertEquals(s._host, 'simple.example.com')
 
161
        self.assertEquals(s._port, 22)
 
162
        self.assertEquals(s._path, '/home/source')
 
163
        self.assert_(s._password is None)
 
164
        
 
165
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
163
166
        self.assertEquals(s._host, 'example.com')
164
167
        self.assertEquals(s._port, 2222)
165
 
        self.assertEquals(s._user, 'robey')
 
168
        self.assertEquals(s._username, 'robey')
166
169
        self.assertEquals(s._password, 'h@t')
167
 
        self.assertEquals(s._path, '/~/relative/')
168
 
 
169
 
    def test_relpath(self):
170
 
        s = _mod_sftp.SFTPTransport('sftp://user@host.com/abs/path')
171
 
        self.assertRaises(errors.PathNotChild, s.relpath,
172
 
                          'sftp://user@host.com/~/rel/path/sub')
173
 
 
174
 
    def test_get_paramiko_vendor(self):
175
 
        """Test that if no 'ssh' is available we get builtin paramiko"""
176
 
        from bzrlib.transport import ssh
177
 
        # set '.' as the only location in the path, forcing no 'ssh' to exist
178
 
        orig_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
179
 
        orig_path = set_or_unset_env('PATH', '.')
180
 
        try:
181
 
            # No vendor defined yet, query for one
182
 
            ssh._ssh_vendor_manager.clear_cache()
183
 
            vendor = ssh._get_ssh_vendor()
184
 
            self.assertIsInstance(vendor, ssh.ParamikoVendor)
185
 
        finally:
186
 
            set_or_unset_env('PATH', orig_path)
187
 
            ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
188
 
 
189
 
    def test_abspath_root_sibling_server(self):
190
 
        server = stub_sftp.SFTPSiblingAbsoluteServer()
191
 
        server.start_server()
192
 
        try:
193
 
            transport = get_transport(server.get_url())
194
 
            self.assertFalse(transport.abspath('/').endswith('/~/'))
195
 
            self.assertTrue(transport.abspath('/').endswith('/'))
196
 
            del transport
197
 
        finally:
198
 
            server.stop_server()
199
 
 
 
170
        self.assertEquals(s._path, 'relative')
 
171
 
 
172
    def test_parse_invalid_url(self):
 
173
        from bzrlib.transport.sftp import SFTPTransport, SFTPTransportError
 
174
        try:
 
175
            s = SFTPTransport('sftp://lilypond.org:~janneke/public_html/bzr/gub',
 
176
                              clone_from=fake)
 
177
            self.fail('expected exception not raised')
 
178
        except SFTPTransportError, e:
 
179
            self.assertEquals(str(e), 
 
180
                    '~janneke: invalid port number')
 
181
 
 
182
        
200
183
 
201
184
class SFTPBranchTest(TestCaseWithSFTPServer):
202
185
    """Test some stuff when accessing a bzr Branch over sftp"""
203
186
 
204
187
    def test_lock_file(self):
205
 
        # old format branches use a special lock file on sftp.
206
 
        b = self.make_branch('', format=bzrdir.BzrDirFormat6())
207
 
        b = bzrlib.branch.Branch.open(self.get_url())
 
188
        """Make sure that a Branch accessed over sftp tries to lock itself."""
 
189
        from bzrlib.branch import Branch
 
190
 
 
191
        self.delayed_setup()
 
192
        b = Branch.initialize(self._sftp_url)
208
193
        self.failUnlessExists('.bzr/')
209
194
        self.failUnlessExists('.bzr/branch-format')
210
195
        self.failUnlessExists('.bzr/branch-lock')
211
196
 
212
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
 
197
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
213
198
        b.lock_write()
214
199
        self.failUnlessExists('.bzr/branch-lock.write-lock')
215
200
        b.unlock()
216
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
217
 
 
218
 
    def test_push_support(self):
219
 
        self.build_tree(['a/', 'a/foo'])
220
 
        t = bzrdir.BzrDir.create_standalone_workingtree('a')
221
 
        b = t.branch
222
 
        t.add('foo')
223
 
        t.commit('foo', rev_id='a1')
224
 
 
225
 
        b2 = bzrdir.BzrDir.create_branch_and_repo(self.get_url('/b'))
226
 
        b2.pull(b)
227
 
 
228
 
        self.assertEquals(b2.revision_history(), ['a1'])
229
 
 
230
 
        open('a/foo', 'wt').write('something new in foo\n')
231
 
        t.commit('new', rev_id='a2')
232
 
        b2.pull(b)
233
 
 
234
 
        self.assertEquals(b2.revision_history(), ['a1', 'a2'])
235
 
 
236
 
 
237
 
class SSHVendorConnection(TestCaseWithSFTPServer):
238
 
    """Test that the ssh vendors can all connect.
239
 
 
240
 
    Verify that a full-handshake (SSH over loopback TCP) sftp connection works.
241
 
 
242
 
    We have 3 sftp implementations in the test suite:
243
 
      'loopback': Doesn't use ssh, just uses a local socket. Most tests are
244
 
                  done this way to save the handshaking time, so it is not
245
 
                  tested again here
246
 
      'none':     This uses paramiko's built-in ssh client and server, and layers
247
 
                  sftp on top of it.
248
 
      None:       If 'ssh' exists on the machine, then it will be spawned as a
249
 
                  child process.
250
 
    """
251
 
 
252
 
    def setUp(self):
253
 
        super(SSHVendorConnection, self).setUp()
254
 
 
255
 
        def create_server():
256
 
            """Just a wrapper so that when created, it will set _vendor"""
257
 
            # SFTPFullAbsoluteServer can handle any vendor,
258
 
            # it just needs to be set between the time it is instantiated
259
 
            # and the time .setUp() is called
260
 
            server = stub_sftp.SFTPFullAbsoluteServer()
261
 
            server._vendor = self._test_vendor
262
 
            return server
263
 
        self._test_vendor = 'loopback'
264
 
        self.vfs_transport_server = create_server
265
 
        f = open('a_file', 'wb')
266
 
        try:
267
 
            f.write('foobar\n')
268
 
        finally:
269
 
            f.close()
270
 
 
271
 
    def set_vendor(self, vendor):
272
 
        self._test_vendor = vendor
273
 
 
274
 
    def test_connection_paramiko(self):
275
 
        from bzrlib.transport import ssh
276
 
        self.set_vendor(ssh.ParamikoVendor())
277
 
        t = self.get_transport()
278
 
        self.assertEqual('foobar\n', t.get('a_file').read())
279
 
 
280
 
    def test_connection_vendor(self):
281
 
        raise TestSkipped("We don't test spawning real ssh,"
282
 
                          " because it prompts for a password."
283
 
                          " Enable this test if we figure out"
284
 
                          " how to prevent this.")
285
 
        self.set_vendor(None)
286
 
        t = self.get_transport()
287
 
        self.assertEqual('foobar\n', t.get('a_file').read())
288
 
 
289
 
 
290
 
class SSHVendorBadConnection(TestCaseWithTransport):
291
 
    """Test that the ssh vendors handle bad connection properly
292
 
 
293
 
    We don't subclass TestCaseWithSFTPServer, because we don't actually
294
 
    need an SFTP connection.
295
 
    """
296
 
 
297
 
    def setUp(self):
298
 
        self.requireFeature(features.paramiko)
299
 
        super(SSHVendorBadConnection, self).setUp()
300
 
 
301
 
        # open a random port, so we know nobody else is using it
302
 
        # but don't actually listen on the port.
303
 
        s = socket.socket()
304
 
        s.bind(('localhost', 0))
305
 
        self.addCleanup(s.close)
306
 
        self.bogus_url = 'sftp://%s:%s/' % s.getsockname()
307
 
 
308
 
    def set_vendor(self, vendor):
309
 
        from bzrlib.transport import ssh
310
 
        self.overrideAttr(ssh._ssh_vendor_manager, '_cached_ssh_vendor', vendor)
311
 
 
312
 
    def test_bad_connection_paramiko(self):
313
 
        """Test that a real connection attempt raises the right error"""
314
 
        from bzrlib.transport import ssh
315
 
        self.set_vendor(ssh.ParamikoVendor())
316
 
        t = bzrlib.transport.get_transport(self.bogus_url)
317
 
        self.assertRaises(errors.ConnectionError, t.get, 'foobar')
318
 
 
319
 
    def test_bad_connection_ssh(self):
320
 
        """None => auto-detect vendor"""
321
 
        self.set_vendor(None)
322
 
        # This is how I would normally test the connection code
323
 
        # it makes it very clear what we are testing.
324
 
        # However, 'ssh' will create stipple on the output, so instead
325
 
        # I'm using run_bzr_subprocess, and parsing the output
326
 
        # try:
327
 
        #     t = bzrlib.transport.get_transport(self.bogus_url)
328
 
        # except errors.ConnectionError:
329
 
        #     # Correct error
330
 
        #     pass
331
 
        # except errors.NameError, e:
332
 
        #     if 'SSHException' in str(e):
333
 
        #         raise TestSkipped('Known NameError bug in paramiko 1.6.1')
334
 
        #     raise
335
 
        # else:
336
 
        #     self.fail('Excepted ConnectionError to be raised')
337
 
 
338
 
        out, err = self.run_bzr_subprocess(['log', self.bogus_url], retcode=3)
339
 
        self.assertEqual('', out)
340
 
        if "NameError: global name 'SSHException'" in err:
341
 
            # We aren't fixing this bug, because it is a bug in
342
 
            # paramiko, but we know about it, so we don't have to
343
 
            # fail the test
344
 
            raise TestSkipped('Known NameError bug with paramiko-1.6.1')
345
 
        self.assertContainsRe(err, r'bzr: ERROR: Unable to connect to SSH host'
346
 
                                   r' 127\.0\.0\.1:\d+; ')
347
 
 
348
 
 
349
 
class SFTPLatencyKnob(TestCaseWithSFTPServer):
350
 
    """Test that the testing SFTPServer's latency knob works."""
351
 
 
352
 
    def test_latency_knob_slows_transport(self):
353
 
        # change the latency knob to 500ms. We take about 40ms for a
354
 
        # loopback connection ordinarily.
355
 
        start_time = time.time()
356
 
        self.get_server().add_latency = 0.5
357
 
        transport = self.get_transport()
358
 
        transport.has('not me') # Force connection by issuing a request
359
 
        with_latency_knob_time = time.time() - start_time
360
 
        self.assertTrue(with_latency_knob_time > 0.4)
361
 
 
362
 
    def test_default(self):
363
 
        # This test is potentially brittle: under extremely high machine load
364
 
        # it could fail, but that is quite unlikely
365
 
        raise TestSkipped('Timing-sensitive test')
366
 
        start_time = time.time()
367
 
        transport = self.get_transport()
368
 
        transport.has('not me') # Force connection by issuing a request
369
 
        regular_time = time.time() - start_time
370
 
        self.assertTrue(regular_time < 0.5)
371
 
 
372
 
 
373
 
class FakeSocket(object):
374
 
    """Fake socket object used to test the SocketDelay wrapper without
375
 
    using a real socket.
376
 
    """
377
 
 
378
 
    def __init__(self):
379
 
        self._data = ""
380
 
 
381
 
    def send(self, data, flags=0):
382
 
        self._data += data
383
 
        return len(data)
384
 
 
385
 
    def sendall(self, data, flags=0):
386
 
        self._data += data
387
 
        return len(data)
388
 
 
389
 
    def recv(self, size, flags=0):
390
 
        if size < len(self._data):
391
 
            result = self._data[:size]
392
 
            self._data = self._data[size:]
393
 
            return result
394
 
        else:
395
 
            result = self._data
396
 
            self._data = ""
397
 
            return result
398
 
 
399
 
 
400
 
class TestSocketDelay(TestCase):
401
 
 
402
 
    def setUp(self):
403
 
        TestCase.setUp(self)
404
 
        self.requireFeature(features.paramiko)
405
 
 
406
 
    def test_delay(self):
407
 
        sending = FakeSocket()
408
 
        receiving = stub_sftp.SocketDelay(sending, 0.1, bandwidth=1000000,
409
 
                                          really_sleep=False)
410
 
        # check that simulated time is charged only per round-trip:
411
 
        t1 = stub_sftp.SocketDelay.simulated_time
412
 
        receiving.send("connect1")
413
 
        self.assertEqual(sending.recv(1024), "connect1")
414
 
        t2 = stub_sftp.SocketDelay.simulated_time
415
 
        self.assertAlmostEqual(t2 - t1, 0.1)
416
 
        receiving.send("connect2")
417
 
        self.assertEqual(sending.recv(1024), "connect2")
418
 
        sending.send("hello")
419
 
        self.assertEqual(receiving.recv(1024), "hello")
420
 
        t3 = stub_sftp.SocketDelay.simulated_time
421
 
        self.assertAlmostEqual(t3 - t2, 0.1)
422
 
        sending.send("hello")
423
 
        self.assertEqual(receiving.recv(1024), "hello")
424
 
        sending.send("hello")
425
 
        self.assertEqual(receiving.recv(1024), "hello")
426
 
        sending.send("hello")
427
 
        self.assertEqual(receiving.recv(1024), "hello")
428
 
        t4 = stub_sftp.SocketDelay.simulated_time
429
 
        self.assertAlmostEqual(t4, t3)
430
 
 
431
 
    def test_bandwidth(self):
432
 
        sending = FakeSocket()
433
 
        receiving = stub_sftp.SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
434
 
                                          really_sleep=False)
435
 
        # check that simulated time is charged only per round-trip:
436
 
        t1 = stub_sftp.SocketDelay.simulated_time
437
 
        receiving.send("connect")
438
 
        self.assertEqual(sending.recv(1024), "connect")
439
 
        sending.send("a" * 100)
440
 
        self.assertEqual(receiving.recv(1024), "a" * 100)
441
 
        t2 = stub_sftp.SocketDelay.simulated_time
442
 
        self.assertAlmostEqual(t2 - t1, 100 + 7)
443
 
 
444
 
 
445
 
class ReadvFile(object):
446
 
    """An object that acts like Paramiko's SFTPFile.readv()"""
447
 
 
448
 
    def __init__(self, data):
449
 
        self._data = data
450
 
 
451
 
    def readv(self, requests):
452
 
        for start, length in requests:
453
 
            yield self._data[start:start+length]
454
 
 
455
 
 
456
 
def _null_report_activity(*a, **k):
457
 
    pass
458
 
 
459
 
 
460
 
class Test_SFTPReadvHelper(tests.TestCase):
461
 
 
462
 
    def checkGetRequests(self, expected_requests, offsets):
463
 
        self.requireFeature(features.paramiko)
464
 
        helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test',
465
 
            _null_report_activity)
466
 
        self.assertEqual(expected_requests, helper._get_requests())
467
 
 
468
 
    def test__get_requests(self):
469
 
        # Small single requests become a single readv request
470
 
        self.checkGetRequests([(0, 100)],
471
 
                              [(0, 20), (30, 50), (20, 10), (80, 20)])
472
 
        # Non-contiguous ranges are given as multiple requests
473
 
        self.checkGetRequests([(0, 20), (30, 50)],
474
 
                              [(10, 10), (30, 20), (0, 10), (50, 30)])
475
 
        # Ranges larger than _max_request_size (32kB) are broken up into
476
 
        # multiple requests, even if it actually spans multiple logical
477
 
        # requests
478
 
        self.checkGetRequests([(0, 32768), (32768, 32768), (65536, 464)],
479
 
                              [(0, 40000), (40000, 100), (40100, 1900),
480
 
                               (42000, 24000)])
481
 
 
482
 
    def checkRequestAndYield(self, expected, data, offsets):
483
 
        self.requireFeature(features.paramiko)
484
 
        helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test',
485
 
            _null_report_activity)
486
 
        data_f = ReadvFile(data)
487
 
        result = list(helper.request_and_yield_offsets(data_f))
488
 
        self.assertEqual(expected, result)
489
 
 
490
 
    def test_request_and_yield_offsets(self):
491
 
        data = 'abcdefghijklmnopqrstuvwxyz'
492
 
        self.checkRequestAndYield([(0, 'a'), (5, 'f'), (10, 'klm')], data,
493
 
                                  [(0, 1), (5, 1), (10, 3)])
494
 
        # Should combine requests, and split them again
495
 
        self.checkRequestAndYield([(0, 'a'), (1, 'b'), (10, 'klm')], data,
496
 
                                  [(0, 1), (1, 1), (10, 3)])
497
 
        # Out of order requests. The requests should get combined, but then be
498
 
        # yielded out-of-order. We also need one that is at the end of a
499
 
        # previous range. See bug #293746
500
 
        self.checkRequestAndYield([(0, 'a'), (10, 'k'), (4, 'efg'), (1, 'bcd')],
501
 
                                  data, [(0, 1), (10, 1), (4, 3), (1, 3)])
502
 
 
503
 
 
504
 
class TestUsesAuthConfig(TestCaseWithSFTPServer):
505
 
    """Test that AuthenticationConfig can supply default usernames."""
506
 
 
507
 
    def get_transport_for_connection(self, set_config):
508
 
        port = self.get_server()._listener.port
509
 
        if set_config:
510
 
            conf = config.AuthenticationConfig()
511
 
            conf._get_config().update(
512
 
                {'sftptest': {'scheme': 'ssh', 'port': port, 'user': 'bar'}})
513
 
            conf._save()
514
 
        t = get_transport('sftp://localhost:%d' % port)
515
 
        # force a connection to be performed.
516
 
        t.has('foo')
517
 
        return t
518
 
 
519
 
    def test_sftp_uses_config(self):
520
 
        t = self.get_transport_for_connection(set_config=True)
521
 
        self.assertEqual('bar', t._get_credentials()[0])
522
 
 
523
 
    def test_sftp_is_none_if_no_config(self):
524
 
        t = self.get_transport_for_connection(set_config=False)
525
 
        self.assertIs(None, t._get_credentials()[0])
526
 
 
527
 
    def test_sftp_doesnt_prompt_username(self):
528
 
        stdout = tests.StringIOWrapper()
529
 
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n', stdout=stdout)
530
 
        t = self.get_transport_for_connection(set_config=False)
531
 
        self.assertIs(None, t._get_credentials()[0])
532
 
        # No prompts should've been printed, stdin shouldn't have been read
533
 
        self.assertEquals("", stdout.getvalue())
534
 
        self.assertEquals(0, ui.ui_factory.stdin.tell())
 
201
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
 
202
 
 
203
 
 
204
if not paramiko_loaded:
 
205
    # TODO: Skip these
 
206
    del SFTPTransportTest
 
207
    del SFTPNonServerTest
 
208
    del SFTPBranchTest