~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp.py

  • Committer: Robert Collins
  • Date: 2005-12-12 22:34:21 UTC
  • Revision ID: robertc@robertcollins.net-20051212223421-c0f6e7a7fae0b5ee
Bugfix to source testing logic to get the right path when .py is returned by __file__

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Robey Pointer <robey@lag.net>
2
 
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
 
#
 
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
 
2
 
4
3
# This program is free software; you can redistribute it and/or modify
5
4
# it under the terms of the GNU General Public License as published by
6
5
# the Free Software Foundation; either version 2 of the License, or
7
6
# (at your option) any later version.
8
 
#
 
7
 
9
8
# This program is distributed in the hope that it will be useful,
10
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
11
# GNU General Public License for more details.
13
 
#
 
12
 
14
13
# You should have received a copy of the GNU General Public License
15
14
# along with this program; if not, write to the Free Software
16
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17
16
 
18
17
import os
19
18
import socket
20
 
import sys
21
19
import threading
22
 
import time
23
 
 
24
 
from bzrlib import (
25
 
    bzrdir,
26
 
    config,
27
 
    errors,
28
 
    tests,
29
 
    transport as _mod_transport,
30
 
    ui,
31
 
    )
32
 
from bzrlib.osutils import (
33
 
    pathjoin,
34
 
    lexists,
35
 
    set_or_unset_env,
36
 
    )
37
 
from bzrlib.tests import (
38
 
    features,
39
 
    TestCaseWithTransport,
40
 
    TestCase,
41
 
    TestSkipped,
42
 
    )
43
 
from bzrlib.tests.http_server import HttpServer
44
 
from bzrlib.transport import get_transport
45
 
import bzrlib.transport.http
46
 
 
47
 
if features.paramiko.available():
48
 
    from bzrlib.transport import sftp as _mod_sftp
49
 
    from bzrlib.tests import stub_sftp
50
 
 
51
 
from bzrlib.workingtree import WorkingTree
52
 
 
53
 
 
54
 
def set_test_transport_to_sftp(testcase):
55
 
    """A helper to set transports on test case instances."""
56
 
    if getattr(testcase, '_get_remote_is_absolute', None) is None:
57
 
        testcase._get_remote_is_absolute = True
58
 
    if testcase._get_remote_is_absolute:
59
 
        testcase.transport_server = stub_sftp.SFTPAbsoluteServer
60
 
    else:
61
 
        testcase.transport_server = stub_sftp.SFTPHomeDirServer
62
 
    testcase.transport_readonly_server = HttpServer
63
 
 
64
 
 
65
 
class TestCaseWithSFTPServer(TestCaseWithTransport):
66
 
    """A test case base class that provides a sftp server on localhost."""
67
 
 
68
 
    def setUp(self):
69
 
        super(TestCaseWithSFTPServer, self).setUp()
70
 
        self.requireFeature(features.paramiko)
71
 
        set_test_transport_to_sftp(self)
72
 
 
73
 
 
74
 
class SFTPLockTests(TestCaseWithSFTPServer):
 
20
 
 
21
from bzrlib.tests import TestCaseInTempDir, TestCase
 
22
from bzrlib.tests.test_transport import TestTransportMixIn
 
23
import bzrlib.errors as errors
 
24
 
 
25
try:
 
26
    import paramiko
 
27
    from stub_sftp import StubServer, StubSFTPServer
 
28
    paramiko_loaded = True
 
29
except ImportError:
 
30
    paramiko_loaded = False
 
31
 
 
32
# XXX: 20051124 jamesh
 
33
# The tests currently pop up a password prompt when an external ssh
 
34
# is used.  This forces the use of the paramiko implementation.
 
35
if paramiko_loaded:
 
36
    import bzrlib.transport.sftp
 
37
    bzrlib.transport.sftp._ssh_vendor = 'none'
 
38
 
 
39
 
 
40
STUB_SERVER_KEY = """
 
41
-----BEGIN RSA PRIVATE KEY-----
 
42
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
 
43
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
 
44
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
 
45
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
 
46
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
 
47
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
 
48
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
 
49
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
 
50
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
 
51
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
 
52
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
 
53
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
 
54
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
 
55
-----END RSA PRIVATE KEY-----
 
56
"""
 
57
    
 
58
 
 
59
class SingleListener (threading.Thread):
 
60
    def __init__(self, callback):
 
61
        threading.Thread.__init__(self)
 
62
        self._callback = callback
 
63
        self._socket = socket.socket()
 
64
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
65
        self._socket.bind(('localhost', 0))
 
66
        self._socket.listen(1)
 
67
        self.port = self._socket.getsockname()[1]
 
68
        self.stop_event = threading.Event()
 
69
 
 
70
    def run(self):
 
71
        s, _ = self._socket.accept()
 
72
        # now close the listen socket
 
73
        self._socket.close()
 
74
        self._callback(s, self.stop_event)
 
75
    
 
76
    def stop(self):
 
77
        self.stop_event.set()
 
78
        # We should consider waiting for the other thread
 
79
        # to stop, because otherwise we get spurious
 
80
        #   bzr: ERROR: Socket exception: Connection reset by peer (54)
 
81
        # because the test suite finishes before the thread has a chance
 
82
        # to close. (Especially when only running a few tests)
 
83
        
 
84
        
 
85
class TestCaseWithSFTPServer (TestCaseInTempDir):
 
86
    """
 
87
    Execute a test case with a stub SFTP server, serving files from the local
 
88
    filesystem over the loopback network.
 
89
    """
 
90
    
 
91
    def _run_server(self, s, stop_event):
 
92
        ssh_server = paramiko.Transport(s)
 
93
        key_file = os.path.join(self._root, 'test_rsa.key')
 
94
        file(key_file, 'w').write(STUB_SERVER_KEY)
 
95
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
96
        ssh_server.add_server_key(host_key)
 
97
        server = StubServer(self)
 
98
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
 
99
        event = threading.Event()
 
100
        ssh_server.start_server(event, server)
 
101
        event.wait(5.0)
 
102
        stop_event.wait(30.0)
 
103
 
 
104
    def setUp(self):
 
105
        TestCaseInTempDir.setUp(self)
 
106
        self._root = self.test_dir
 
107
        self._is_setup = False
 
108
 
 
109
    def delayed_setup(self):
 
110
        # some tests are just stubs that call setUp and then immediately call
 
111
        # tearDwon.  so don't create the port listener until get_transport is
 
112
        # called and we know we're in an actual test.
 
113
        if self._is_setup:
 
114
            return
 
115
        self._listener = SingleListener(self._run_server)
 
116
        self._listener.setDaemon(True)
 
117
        self._listener.start()        
 
118
        self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
 
119
        self._is_setup = True
 
120
        
 
121
    def tearDown(self):
 
122
        try:
 
123
            self._listener.stop()
 
124
        except AttributeError:
 
125
            pass
 
126
        TestCaseInTempDir.tearDown(self)
 
127
 
 
128
        
 
129
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
 
130
    readonly = False
 
131
 
 
132
    def setUp(self):
 
133
        TestCaseWithSFTPServer.setUp(self)
 
134
        self.sftplogs = []
 
135
 
 
136
    def log(self, *args):
 
137
        """Override the default log to grab sftp server messages"""
 
138
        TestCaseWithSFTPServer.log(self, *args)
 
139
        if args and args[0].startswith('sftpserver'):
 
140
            self.sftplogs.append(args[0])
 
141
 
 
142
    def get_transport(self):
 
143
        self.delayed_setup()
 
144
        from bzrlib.transport.sftp import SFTPTransport
 
145
        url = self._sftp_url
 
146
        return SFTPTransport(url)
75
147
 
76
148
    def test_sftp_locks(self):
77
149
        from bzrlib.errors import LockError
85
157
        self.assertRaises(LockError, t.lock_write, 'bogus')
86
158
 
87
159
        l.unlock()
88
 
        self.failIf(lexists('bogus.write-lock'))
 
160
        self.failIf(os.path.lexists('bogus.write-lock'))
89
161
 
90
162
        open('something.write-lock', 'wb').write('fake lock\n')
91
163
        self.assertRaises(LockError, t.lock_write, 'something')
98
170
        l.unlock()
99
171
        l2.unlock()
100
172
 
101
 
 
102
 
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
103
 
    """Test the SFTP transport with homedir based relative paths."""
104
 
 
105
 
    def test__remote_path(self):
106
 
        if sys.platform == 'darwin':
107
 
            # This test is about sftp absolute path handling. There is already
108
 
            # (in this test) a TODO about windows needing an absolute path
109
 
            # without drive letter. To me, using self.test_dir is a trick to
110
 
            # get an absolute path for comparison purposes.  That fails for OSX
111
 
            # because the sftp server doesn't resolve the links (and it doesn't
112
 
            # have to). --vila 20070924
113
 
            self.knownFailure('Mac OSX symlinks /tmp to /private/tmp,'
114
 
                              ' testing against self.test_dir'
115
 
                              ' is not appropriate')
 
173
    def test_multiple_connections(self):
116
174
        t = self.get_transport()
117
 
        # This test require unix-like absolute path
118
 
        test_dir = self.test_dir
119
 
        if sys.platform == 'win32':
120
 
            # using hack suggested by John Meinel.
121
 
            # TODO: write another mock server for this test
122
 
            #       and use absolute path without drive letter
123
 
            test_dir = '/' + test_dir
124
 
        # try what is currently used:
125
 
        # remote path = self._abspath(relpath)
126
 
        self.assertIsSameRealPath(test_dir + '/relative',
127
 
                                  t._remote_path('relative'))
128
 
        # we dont os.path.join because windows gives us the wrong path
129
 
        root_segments = test_dir.split('/')
130
 
        root_parent = '/'.join(root_segments[:-1])
131
 
        # .. should be honoured
132
 
        self.assertIsSameRealPath(root_parent + '/sibling',
133
 
                                  t._remote_path('../sibling'))
134
 
        # /  should be illegal ?
135
 
        ### FIXME decide and then test for all transports. RBC20051208
136
 
 
137
 
 
138
 
class SFTPTransportTestRelativeRoot(TestCaseWithSFTPServer):
139
 
    """Test the SFTP transport with homedir based relative paths."""
140
 
 
141
 
    def setUp(self):
142
 
        # Only SFTPHomeDirServer is tested here
143
 
        self._get_remote_is_absolute = False
144
 
        super(SFTPTransportTestRelativeRoot, self).setUp()
145
 
 
146
 
    def test__remote_path_relative_root(self):
147
 
        # relative paths are preserved
148
 
        t = self.get_transport('')
149
 
        self.assertEqual('/~/', t._path)
150
 
        # the remote path should be relative to home dir
151
 
        # (i.e. not begining with a '/')
152
 
        self.assertEqual('a', t._remote_path('a'))
 
175
        self.assertEquals(self.sftplogs, 
 
176
                ['sftpserver - authorizing: foo'
 
177
               , 'sftpserver - channel request: session, 1'])
 
178
        self.sftplogs = []
 
179
        # The second request should reuse the first connection
 
180
        # SingleListener only allows for a single connection,
 
181
        # So the next line fails unless the connection is reused
 
182
        t2 = self.get_transport()
 
183
        self.assertEquals(self.sftplogs, [])
 
184
 
 
185
 
 
186
class FakeSFTPTransport (object):
 
187
    _sftp = object()
 
188
fake = FakeSFTPTransport()
153
189
 
154
190
 
155
191
class SFTPNonServerTest(TestCase):
156
 
    def setUp(self):
157
 
        TestCase.setUp(self)
158
 
        self.requireFeature(features.paramiko)
 
192
    def test_parse_url(self):
 
193
        from bzrlib.transport.sftp import SFTPTransport
 
194
        s = SFTPTransport('sftp://simple.example.com/%2fhome/source', clone_from=fake)
 
195
        self.assertEquals(s._host, 'simple.example.com')
 
196
        self.assertEquals(s._port, None)
 
197
        self.assertEquals(s._path, '/home/source')
 
198
        self.failUnless(s._password is None)
159
199
 
160
 
    def test_parse_url_with_home_dir(self):
161
 
        s = _mod_sftp.SFTPTransport(
162
 
            'sftp://ro%62ey:h%40t@example.com:2222/~/relative')
 
200
        self.assertEquals(s.base, 'sftp://simple.example.com/%2Fhome/source')
 
201
        
 
202
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
163
203
        self.assertEquals(s._host, 'example.com')
164
204
        self.assertEquals(s._port, 2222)
165
 
        self.assertEquals(s._user, 'robey')
 
205
        self.assertEquals(s._username, 'robey')
166
206
        self.assertEquals(s._password, 'h@t')
167
 
        self.assertEquals(s._path, '/~/relative/')
 
207
        self.assertEquals(s._path, 'relative')
 
208
 
 
209
        # Base should not keep track of the password
 
210
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/relative')
 
211
 
 
212
        # Double slash should be accepted instead of using %2F
 
213
        s = SFTPTransport('sftp://user@example.com:22//absolute/path', clone_from=fake)
 
214
        self.assertEquals(s._host, 'example.com')
 
215
        self.assertEquals(s._port, 22)
 
216
        self.assertEquals(s._username, 'user')
 
217
        self.assertEquals(s._password, None)
 
218
        self.assertEquals(s._path, '/absolute/path')
 
219
 
 
220
        # Also, don't show the port if it is the default 22
 
221
        self.assertEquals(s.base, 'sftp://user@example.com:22/%2Fabsolute/path')
168
222
 
169
223
    def test_relpath(self):
170
 
        s = _mod_sftp.SFTPTransport('sftp://user@host.com/abs/path')
171
 
        self.assertRaises(errors.PathNotChild, s.relpath,
172
 
                          'sftp://user@host.com/~/rel/path/sub')
173
 
 
174
 
    def test_get_paramiko_vendor(self):
175
 
        """Test that if no 'ssh' is available we get builtin paramiko"""
176
 
        from bzrlib.transport import ssh
177
 
        # set '.' as the only location in the path, forcing no 'ssh' to exist
178
 
        orig_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
179
 
        orig_path = set_or_unset_env('PATH', '.')
180
 
        try:
181
 
            # No vendor defined yet, query for one
182
 
            ssh._ssh_vendor_manager.clear_cache()
183
 
            vendor = ssh._get_ssh_vendor()
184
 
            self.assertIsInstance(vendor, ssh.ParamikoVendor)
185
 
        finally:
186
 
            set_or_unset_env('PATH', orig_path)
187
 
            ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
188
 
 
189
 
    def test_abspath_root_sibling_server(self):
190
 
        server = stub_sftp.SFTPSiblingAbsoluteServer()
191
 
        server.start_server()
192
 
        try:
193
 
            transport = get_transport(server.get_url())
194
 
            self.assertFalse(transport.abspath('/').endswith('/~/'))
195
 
            self.assertTrue(transport.abspath('/').endswith('/'))
196
 
            del transport
197
 
        finally:
198
 
            server.stop_server()
 
224
        from bzrlib.transport.sftp import SFTPTransport
 
225
        from bzrlib.errors import PathNotChild
 
226
 
 
227
        s = SFTPTransport('sftp://user@host.com//abs/path', clone_from=fake)
 
228
        self.assertEquals(s.relpath('sftp://user@host.com//abs/path/sub'), 'sub')
 
229
        # Can't test this one, because we actually get an AssertionError
 
230
        # TODO: Consider raising an exception rather than an assert
 
231
        #self.assertRaises(PathNotChild, s.relpath, 'http://user@host.com//abs/path/sub')
 
232
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user2@host.com//abs/path/sub')
 
233
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@otherhost.com//abs/path/sub')
 
234
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com:33//abs/path/sub')
 
235
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com/abs/path/sub')
 
236
 
 
237
        # Make sure it works when we don't supply a username
 
238
        s = SFTPTransport('sftp://host.com//abs/path', clone_from=fake)
 
239
        self.assertEquals(s.relpath('sftp://host.com//abs/path/sub'), 'sub')
 
240
 
 
241
        # Make sure it works when parts of the path will be url encoded
 
242
        # TODO: These may be incorrect, we might need to urllib.urlencode() before
 
243
        # we pass the paths into the SFTPTransport constructor
 
244
        s = SFTPTransport('sftp://host.com/dev/,path', clone_from=fake)
 
245
        self.assertEquals(s.relpath('sftp://host.com/dev/,path/sub'), 'sub')
 
246
        s = SFTPTransport('sftp://host.com/dev/%path', clone_from=fake)
 
247
        self.assertEquals(s.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
248
 
 
249
    def test_parse_invalid_url(self):
 
250
        from bzrlib.transport.sftp import SFTPTransport, TransportError
 
251
        try:
 
252
            s = SFTPTransport('sftp://lilypond.org:~janneke/public_html/bzr/gub',
 
253
                              clone_from=fake)
 
254
            self.fail('expected exception not raised')
 
255
        except TransportError, e:
 
256
            self.assertEquals(str(e), 
 
257
                    '~janneke: invalid port number')
199
258
 
200
259
 
201
260
class SFTPBranchTest(TestCaseWithSFTPServer):
202
261
    """Test some stuff when accessing a bzr Branch over sftp"""
203
262
 
204
263
    def test_lock_file(self):
205
 
        # old format branches use a special lock file on sftp.
206
 
        b = self.make_branch('', format=bzrdir.BzrDirFormat6())
207
 
        b = bzrlib.branch.Branch.open(self.get_url())
 
264
        """Make sure that a Branch accessed over sftp tries to lock itself."""
 
265
        from bzrlib.branch import Branch
 
266
 
 
267
        self.delayed_setup()
 
268
        b = Branch.initialize(self._sftp_url)
208
269
        self.failUnlessExists('.bzr/')
209
270
        self.failUnlessExists('.bzr/branch-format')
210
271
        self.failUnlessExists('.bzr/branch-lock')
211
272
 
212
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
 
273
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
213
274
        b.lock_write()
214
275
        self.failUnlessExists('.bzr/branch-lock.write-lock')
215
276
        b.unlock()
216
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
 
277
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
 
278
 
 
279
    def test_no_working_tree(self):
 
280
        from bzrlib.branch import Branch
 
281
        self.delayed_setup()
 
282
        b = Branch.initialize(self._sftp_url)
 
283
        self.assertRaises(errors.NoWorkingTree, b.working_tree)
217
284
 
218
285
    def test_push_support(self):
 
286
        from bzrlib.branch import Branch
 
287
        self.delayed_setup()
 
288
 
219
289
        self.build_tree(['a/', 'a/foo'])
220
 
        t = bzrdir.BzrDir.create_standalone_workingtree('a')
221
 
        b = t.branch
 
290
        b = Branch.initialize('a')
 
291
        t = b.working_tree()
222
292
        t.add('foo')
223
293
        t.commit('foo', rev_id='a1')
224
294
 
225
 
        b2 = bzrdir.BzrDir.create_branch_and_repo(self.get_url('/b'))
 
295
        os.mkdir('b')
 
296
        b2 = Branch.initialize(self._sftp_url + 'b')
226
297
        b2.pull(b)
227
298
 
228
299
        self.assertEquals(b2.revision_history(), ['a1'])
229
300
 
230
 
        open('a/foo', 'wt').write('something new in foo\n')
231
 
        t.commit('new', rev_id='a2')
232
 
        b2.pull(b)
233
 
 
234
 
        self.assertEquals(b2.revision_history(), ['a1', 'a2'])
235
 
 
236
 
 
237
 
class SSHVendorConnection(TestCaseWithSFTPServer):
238
 
    """Test that the ssh vendors can all connect.
239
 
 
240
 
    Verify that a full-handshake (SSH over loopback TCP) sftp connection works.
241
 
 
242
 
    We have 3 sftp implementations in the test suite:
243
 
      'loopback': Doesn't use ssh, just uses a local socket. Most tests are
244
 
                  done this way to save the handshaking time, so it is not
245
 
                  tested again here
246
 
      'none':     This uses paramiko's built-in ssh client and server, and layers
247
 
                  sftp on top of it.
248
 
      None:       If 'ssh' exists on the machine, then it will be spawned as a
249
 
                  child process.
250
 
    """
251
 
 
252
 
    def setUp(self):
253
 
        super(SSHVendorConnection, self).setUp()
254
 
 
255
 
        def create_server():
256
 
            """Just a wrapper so that when created, it will set _vendor"""
257
 
            # SFTPFullAbsoluteServer can handle any vendor,
258
 
            # it just needs to be set between the time it is instantiated
259
 
            # and the time .setUp() is called
260
 
            server = stub_sftp.SFTPFullAbsoluteServer()
261
 
            server._vendor = self._test_vendor
262
 
            return server
263
 
        self._test_vendor = 'loopback'
264
 
        self.vfs_transport_server = create_server
265
 
        f = open('a_file', 'wb')
266
 
        try:
267
 
            f.write('foobar\n')
268
 
        finally:
269
 
            f.close()
270
 
 
271
 
    def set_vendor(self, vendor):
272
 
        self._test_vendor = vendor
273
 
 
274
 
    def test_connection_paramiko(self):
275
 
        from bzrlib.transport import ssh
276
 
        self.set_vendor(ssh.ParamikoVendor())
277
 
        t = self.get_transport()
278
 
        self.assertEqual('foobar\n', t.get('a_file').read())
279
 
 
280
 
    def test_connection_vendor(self):
281
 
        raise TestSkipped("We don't test spawning real ssh,"
282
 
                          " because it prompts for a password."
283
 
                          " Enable this test if we figure out"
284
 
                          " how to prevent this.")
285
 
        self.set_vendor(None)
286
 
        t = self.get_transport()
287
 
        self.assertEqual('foobar\n', t.get('a_file').read())
288
 
 
289
 
 
290
 
class SSHVendorBadConnection(TestCaseWithTransport):
291
 
    """Test that the ssh vendors handle bad connection properly
292
 
 
293
 
    We don't subclass TestCaseWithSFTPServer, because we don't actually
294
 
    need an SFTP connection.
295
 
    """
296
 
 
297
 
    def setUp(self):
298
 
        self.requireFeature(features.paramiko)
299
 
        super(SSHVendorBadConnection, self).setUp()
300
 
 
301
 
        # open a random port, so we know nobody else is using it
302
 
        # but don't actually listen on the port.
303
 
        s = socket.socket()
304
 
        s.bind(('localhost', 0))
305
 
        self.addCleanup(s.close)
306
 
        self.bogus_url = 'sftp://%s:%s/' % s.getsockname()
307
 
 
308
 
    def set_vendor(self, vendor):
309
 
        from bzrlib.transport import ssh
310
 
        self.overrideAttr(ssh._ssh_vendor_manager, '_cached_ssh_vendor', vendor)
311
 
 
312
 
    def test_bad_connection_paramiko(self):
313
 
        """Test that a real connection attempt raises the right error"""
314
 
        from bzrlib.transport import ssh
315
 
        self.set_vendor(ssh.ParamikoVendor())
316
 
        t = bzrlib.transport.get_transport(self.bogus_url)
317
 
        self.assertRaises(errors.ConnectionError, t.get, 'foobar')
318
 
 
319
 
    def test_bad_connection_ssh(self):
320
 
        """None => auto-detect vendor"""
321
 
        self.set_vendor(None)
322
 
        # This is how I would normally test the connection code
323
 
        # it makes it very clear what we are testing.
324
 
        # However, 'ssh' will create stipple on the output, so instead
325
 
        # I'm using run_bzr_subprocess, and parsing the output
326
 
        # try:
327
 
        #     t = bzrlib.transport.get_transport(self.bogus_url)
328
 
        # except errors.ConnectionError:
329
 
        #     # Correct error
330
 
        #     pass
331
 
        # except errors.NameError, e:
332
 
        #     if 'SSHException' in str(e):
333
 
        #         raise TestSkipped('Known NameError bug in paramiko 1.6.1')
334
 
        #     raise
335
 
        # else:
336
 
        #     self.fail('Excepted ConnectionError to be raised')
337
 
 
338
 
        out, err = self.run_bzr_subprocess(['log', self.bogus_url], retcode=3)
339
 
        self.assertEqual('', out)
340
 
        if "NameError: global name 'SSHException'" in err:
341
 
            # We aren't fixing this bug, because it is a bug in
342
 
            # paramiko, but we know about it, so we don't have to
343
 
            # fail the test
344
 
            raise TestSkipped('Known NameError bug with paramiko-1.6.1')
345
 
        self.assertContainsRe(err, r'bzr: ERROR: Unable to connect to SSH host'
346
 
                                   r' 127\.0\.0\.1:\d+; ')
347
 
 
348
 
 
349
 
class SFTPLatencyKnob(TestCaseWithSFTPServer):
350
 
    """Test that the testing SFTPServer's latency knob works."""
351
 
 
352
 
    def test_latency_knob_slows_transport(self):
353
 
        # change the latency knob to 500ms. We take about 40ms for a
354
 
        # loopback connection ordinarily.
355
 
        start_time = time.time()
356
 
        self.get_server().add_latency = 0.5
357
 
        transport = self.get_transport()
358
 
        transport.has('not me') # Force connection by issuing a request
359
 
        with_latency_knob_time = time.time() - start_time
360
 
        self.assertTrue(with_latency_knob_time > 0.4)
361
 
 
362
 
    def test_default(self):
363
 
        # This test is potentially brittle: under extremely high machine load
364
 
        # it could fail, but that is quite unlikely
365
 
        raise TestSkipped('Timing-sensitive test')
366
 
        start_time = time.time()
367
 
        transport = self.get_transport()
368
 
        transport.has('not me') # Force connection by issuing a request
369
 
        regular_time = time.time() - start_time
370
 
        self.assertTrue(regular_time < 0.5)
371
 
 
372
 
 
373
 
class FakeSocket(object):
374
 
    """Fake socket object used to test the SocketDelay wrapper without
375
 
    using a real socket.
376
 
    """
377
 
 
378
 
    def __init__(self):
379
 
        self._data = ""
380
 
 
381
 
    def send(self, data, flags=0):
382
 
        self._data += data
383
 
        return len(data)
384
 
 
385
 
    def sendall(self, data, flags=0):
386
 
        self._data += data
387
 
        return len(data)
388
 
 
389
 
    def recv(self, size, flags=0):
390
 
        if size < len(self._data):
391
 
            result = self._data[:size]
392
 
            self._data = self._data[size:]
393
 
            return result
394
 
        else:
395
 
            result = self._data
396
 
            self._data = ""
397
 
            return result
398
 
 
399
 
 
400
 
class TestSocketDelay(TestCase):
401
 
 
402
 
    def setUp(self):
403
 
        TestCase.setUp(self)
404
 
        self.requireFeature(features.paramiko)
405
 
 
406
 
    def test_delay(self):
407
 
        sending = FakeSocket()
408
 
        receiving = stub_sftp.SocketDelay(sending, 0.1, bandwidth=1000000,
409
 
                                          really_sleep=False)
410
 
        # check that simulated time is charged only per round-trip:
411
 
        t1 = stub_sftp.SocketDelay.simulated_time
412
 
        receiving.send("connect1")
413
 
        self.assertEqual(sending.recv(1024), "connect1")
414
 
        t2 = stub_sftp.SocketDelay.simulated_time
415
 
        self.assertAlmostEqual(t2 - t1, 0.1)
416
 
        receiving.send("connect2")
417
 
        self.assertEqual(sending.recv(1024), "connect2")
418
 
        sending.send("hello")
419
 
        self.assertEqual(receiving.recv(1024), "hello")
420
 
        t3 = stub_sftp.SocketDelay.simulated_time
421
 
        self.assertAlmostEqual(t3 - t2, 0.1)
422
 
        sending.send("hello")
423
 
        self.assertEqual(receiving.recv(1024), "hello")
424
 
        sending.send("hello")
425
 
        self.assertEqual(receiving.recv(1024), "hello")
426
 
        sending.send("hello")
427
 
        self.assertEqual(receiving.recv(1024), "hello")
428
 
        t4 = stub_sftp.SocketDelay.simulated_time
429
 
        self.assertAlmostEqual(t4, t3)
430
 
 
431
 
    def test_bandwidth(self):
432
 
        sending = FakeSocket()
433
 
        receiving = stub_sftp.SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
434
 
                                          really_sleep=False)
435
 
        # check that simulated time is charged only per round-trip:
436
 
        t1 = stub_sftp.SocketDelay.simulated_time
437
 
        receiving.send("connect")
438
 
        self.assertEqual(sending.recv(1024), "connect")
439
 
        sending.send("a" * 100)
440
 
        self.assertEqual(receiving.recv(1024), "a" * 100)
441
 
        t2 = stub_sftp.SocketDelay.simulated_time
442
 
        self.assertAlmostEqual(t2 - t1, 100 + 7)
443
 
 
444
 
 
445
 
class ReadvFile(object):
446
 
    """An object that acts like Paramiko's SFTPFile.readv()"""
447
 
 
448
 
    def __init__(self, data):
449
 
        self._data = data
450
 
 
451
 
    def readv(self, requests):
452
 
        for start, length in requests:
453
 
            yield self._data[start:start+length]
454
 
 
455
 
 
456
 
def _null_report_activity(*a, **k):
457
 
    pass
458
 
 
459
 
 
460
 
class Test_SFTPReadvHelper(tests.TestCase):
461
 
 
462
 
    def checkGetRequests(self, expected_requests, offsets):
463
 
        self.requireFeature(features.paramiko)
464
 
        helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test',
465
 
            _null_report_activity)
466
 
        self.assertEqual(expected_requests, helper._get_requests())
467
 
 
468
 
    def test__get_requests(self):
469
 
        # Small single requests become a single readv request
470
 
        self.checkGetRequests([(0, 100)],
471
 
                              [(0, 20), (30, 50), (20, 10), (80, 20)])
472
 
        # Non-contiguous ranges are given as multiple requests
473
 
        self.checkGetRequests([(0, 20), (30, 50)],
474
 
                              [(10, 10), (30, 20), (0, 10), (50, 30)])
475
 
        # Ranges larger than _max_request_size (32kB) are broken up into
476
 
        # multiple requests, even if it actually spans multiple logical
477
 
        # requests
478
 
        self.checkGetRequests([(0, 32768), (32768, 32768), (65536, 464)],
479
 
                              [(0, 40000), (40000, 100), (40100, 1900),
480
 
                               (42000, 24000)])
481
 
 
482
 
    def checkRequestAndYield(self, expected, data, offsets):
483
 
        self.requireFeature(features.paramiko)
484
 
        helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test',
485
 
            _null_report_activity)
486
 
        data_f = ReadvFile(data)
487
 
        result = list(helper.request_and_yield_offsets(data_f))
488
 
        self.assertEqual(expected, result)
489
 
 
490
 
    def test_request_and_yield_offsets(self):
491
 
        data = 'abcdefghijklmnopqrstuvwxyz'
492
 
        self.checkRequestAndYield([(0, 'a'), (5, 'f'), (10, 'klm')], data,
493
 
                                  [(0, 1), (5, 1), (10, 3)])
494
 
        # Should combine requests, and split them again
495
 
        self.checkRequestAndYield([(0, 'a'), (1, 'b'), (10, 'klm')], data,
496
 
                                  [(0, 1), (1, 1), (10, 3)])
497
 
        # Out of order requests. The requests should get combined, but then be
498
 
        # yielded out-of-order. We also need one that is at the end of a
499
 
        # previous range. See bug #293746
500
 
        self.checkRequestAndYield([(0, 'a'), (10, 'k'), (4, 'efg'), (1, 'bcd')],
501
 
                                  data, [(0, 1), (10, 1), (4, 3), (1, 3)])
502
 
 
503
 
 
504
 
class TestUsesAuthConfig(TestCaseWithSFTPServer):
505
 
    """Test that AuthenticationConfig can supply default usernames."""
506
 
 
507
 
    def get_transport_for_connection(self, set_config):
508
 
        port = self.get_server()._listener.port
509
 
        if set_config:
510
 
            conf = config.AuthenticationConfig()
511
 
            conf._get_config().update(
512
 
                {'sftptest': {'scheme': 'ssh', 'port': port, 'user': 'bar'}})
513
 
            conf._save()
514
 
        t = get_transport('sftp://localhost:%d' % port)
515
 
        # force a connection to be performed.
516
 
        t.has('foo')
517
 
        return t
518
 
 
519
 
    def test_sftp_uses_config(self):
520
 
        t = self.get_transport_for_connection(set_config=True)
521
 
        self.assertEqual('bar', t._get_credentials()[0])
522
 
 
523
 
    def test_sftp_is_none_if_no_config(self):
524
 
        t = self.get_transport_for_connection(set_config=False)
525
 
        self.assertIs(None, t._get_credentials()[0])
526
 
 
527
 
    def test_sftp_doesnt_prompt_username(self):
528
 
        stdout = tests.StringIOWrapper()
529
 
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n', stdout=stdout)
530
 
        t = self.get_transport_for_connection(set_config=False)
531
 
        self.assertIs(None, t._get_credentials()[0])
532
 
        # No prompts should've been printed, stdin shouldn't have been read
533
 
        self.assertEquals("", stdout.getvalue())
534
 
        self.assertEquals(0, ui.ui_factory.stdin.tell())
 
301
 
 
302
if not paramiko_loaded:
 
303
    # TODO: Skip these
 
304
    del SFTPTransportTest
 
305
    del SFTPNonServerTest
 
306
    del SFTPBranchTest