~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp.py

  • Committer: Robert Collins
  • Date: 2005-12-24 02:20:45 UTC
  • mto: (1185.50.57 bzr-jam-integration)
  • mto: This revision was merged to the branch mainline in revision 1550.
  • Revision ID: robertc@robertcollins.net-20051224022045-14efc8dfa0e1a4e9
Start tests for api usage.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
2
 
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
 
#
 
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
 
2
 
4
3
# This program is free software; you can redistribute it and/or modify
5
4
# it under the terms of the GNU General Public License as published by
6
5
# the Free Software Foundation; either version 2 of the License, or
7
6
# (at your option) any later version.
8
 
#
 
7
 
9
8
# This program is distributed in the hope that it will be useful,
10
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
11
# GNU General Public License for more details.
13
 
#
 
12
 
14
13
# You should have received a copy of the GNU General Public License
15
14
# along with this program; if not, write to the Free Software
16
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17
16
 
18
17
import os
19
18
import socket
20
 
import sys
21
19
import threading
22
 
import time
23
20
 
24
 
import bzrlib.bzrdir as bzrdir
 
21
from bzrlib.tests import TestCaseInTempDir, TestCase
 
22
from bzrlib.tests.test_transport import TestTransportMixIn
25
23
import bzrlib.errors as errors
26
 
from bzrlib.osutils import pathjoin, lexists, set_or_unset_env
27
 
from bzrlib.tests import TestCaseWithTransport, TestCase, TestSkipped
28
 
from bzrlib.tests.HttpServer import HttpServer
29
 
import bzrlib.transport
30
 
from bzrlib.transport import get_transport
31
 
import bzrlib.transport.http
32
 
from bzrlib.workingtree import WorkingTree
33
24
 
34
25
try:
35
26
    import paramiko
 
27
    from stub_sftp import StubServer, StubSFTPServer
36
28
    paramiko_loaded = True
37
29
except ImportError:
38
30
    paramiko_loaded = False
39
31
 
40
 
 
41
 
def set_test_transport_to_sftp(testcase):
42
 
    """A helper to set transports on test case instances."""
43
 
    from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
44
 
    if getattr(testcase, '_get_remote_is_absolute', None) is None:
45
 
        testcase._get_remote_is_absolute = True
46
 
    if testcase._get_remote_is_absolute:
47
 
        testcase.transport_server = SFTPAbsoluteServer
48
 
    else:
49
 
        testcase.transport_server = SFTPHomeDirServer
50
 
    testcase.transport_readonly_server = HttpServer
51
 
 
52
 
 
53
 
class TestCaseWithSFTPServer(TestCaseWithTransport):
54
 
    """A test case base class that provides a sftp server on localhost."""
55
 
 
56
 
    def setUp(self):
57
 
        super(TestCaseWithSFTPServer, self).setUp()
58
 
        if not paramiko_loaded:
59
 
            raise TestSkipped('you must have paramiko to run this test')
60
 
        set_test_transport_to_sftp(self)
61
 
 
62
 
    def get_transport(self, path=None):
63
 
        """Return a transport relative to self._test_root."""
64
 
        return bzrlib.transport.get_transport(self.get_url(path))
65
 
 
66
 
 
67
 
class SFTPLockTests (TestCaseWithSFTPServer):
 
32
# XXX: 20051124 jamesh
 
33
# The tests currently pop up a password prompt when an external ssh
 
34
# is used.  This forces the use of the paramiko implementation.
 
35
if paramiko_loaded:
 
36
    import bzrlib.transport.sftp
 
37
    bzrlib.transport.sftp._ssh_vendor = 'none'
 
38
 
 
39
 
 
40
STUB_SERVER_KEY = """
 
41
-----BEGIN RSA PRIVATE KEY-----
 
42
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
 
43
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
 
44
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
 
45
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
 
46
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
 
47
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
 
48
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
 
49
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
 
50
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
 
51
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
 
52
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
 
53
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
 
54
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
 
55
-----END RSA PRIVATE KEY-----
 
56
"""
 
57
    
 
58
 
 
59
class SingleListener (threading.Thread):
 
60
    def __init__(self, callback):
 
61
        threading.Thread.__init__(self)
 
62
        self._callback = callback
 
63
        self._socket = socket.socket()
 
64
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
65
        self._socket.bind(('localhost', 0))
 
66
        self._socket.listen(1)
 
67
        self.port = self._socket.getsockname()[1]
 
68
        self.stop_event = threading.Event()
 
69
 
 
70
    def run(self):
 
71
        s, _ = self._socket.accept()
 
72
        # now close the listen socket
 
73
        self._socket.close()
 
74
        self._callback(s, self.stop_event)
 
75
    
 
76
    def stop(self):
 
77
        self.stop_event.set()
 
78
        # We should consider waiting for the other thread
 
79
        # to stop, because otherwise we get spurious
 
80
        #   bzr: ERROR: Socket exception: Connection reset by peer (54)
 
81
        # because the test suite finishes before the thread has a chance
 
82
        # to close. (Especially when only running a few tests)
 
83
        
 
84
        
 
85
class TestCaseWithSFTPServer (TestCaseInTempDir):
 
86
    """
 
87
    Execute a test case with a stub SFTP server, serving files from the local
 
88
    filesystem over the loopback network.
 
89
    """
 
90
    
 
91
    def _run_server(self, s, stop_event):
 
92
        ssh_server = paramiko.Transport(s)
 
93
        key_file = os.path.join(self._root, 'test_rsa.key')
 
94
        file(key_file, 'w').write(STUB_SERVER_KEY)
 
95
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
96
        ssh_server.add_server_key(host_key)
 
97
        server = StubServer(self)
 
98
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
 
99
        event = threading.Event()
 
100
        ssh_server.start_server(event, server)
 
101
        event.wait(5.0)
 
102
        stop_event.wait(30.0)
 
103
 
 
104
    def setUp(self):
 
105
        TestCaseInTempDir.setUp(self)
 
106
        self._root = self.test_dir
 
107
        self._is_setup = False
 
108
 
 
109
    def delayed_setup(self):
 
110
        # some tests are just stubs that call setUp and then immediately call
 
111
        # tearDwon.  so don't create the port listener until get_transport is
 
112
        # called and we know we're in an actual test.
 
113
        if self._is_setup:
 
114
            return
 
115
        self._listener = SingleListener(self._run_server)
 
116
        self._listener.setDaemon(True)
 
117
        self._listener.start()        
 
118
        self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
 
119
        self._is_setup = True
 
120
        
 
121
    def tearDown(self):
 
122
        try:
 
123
            self._listener.stop()
 
124
        except AttributeError:
 
125
            pass
 
126
        TestCaseInTempDir.tearDown(self)
 
127
 
 
128
        
 
129
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
 
130
    readonly = False
 
131
 
 
132
    def setUp(self):
 
133
        TestCaseWithSFTPServer.setUp(self)
 
134
        self.sftplogs = []
 
135
 
 
136
    def log(self, *args):
 
137
        """Override the default log to grab sftp server messages"""
 
138
        TestCaseWithSFTPServer.log(self, *args)
 
139
        if args and args[0].startswith('sftpserver'):
 
140
            self.sftplogs.append(args[0])
 
141
 
 
142
    def get_transport(self):
 
143
        self.delayed_setup()
 
144
        from bzrlib.transport.sftp import SFTPTransport
 
145
        url = self._sftp_url
 
146
        return SFTPTransport(url)
68
147
 
69
148
    def test_sftp_locks(self):
70
149
        from bzrlib.errors import LockError
78
157
        self.assertRaises(LockError, t.lock_write, 'bogus')
79
158
 
80
159
        l.unlock()
81
 
        self.failIf(lexists('bogus.write-lock'))
 
160
        self.failIf(os.path.lexists('bogus.write-lock'))
82
161
 
83
162
        open('something.write-lock', 'wb').write('fake lock\n')
84
163
        self.assertRaises(LockError, t.lock_write, 'something')
93
172
 
94
173
    def test_multiple_connections(self):
95
174
        t = self.get_transport()
96
 
        self.assertTrue('sftpserver - new connection' in self.get_server().logs)
97
 
        self.get_server().logs = []
 
175
        self.assertEquals(self.sftplogs, 
 
176
                ['sftpserver - authorizing: foo'
 
177
               , 'sftpserver - channel request: session, 1'])
 
178
        self.sftplogs = []
98
179
        # The second request should reuse the first connection
99
180
        # SingleListener only allows for a single connection,
100
181
        # So the next line fails unless the connection is reused
101
182
        t2 = self.get_transport()
102
 
        self.assertEquals(self.get_server().logs, [])
103
 
 
104
 
 
105
 
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
106
 
    """Test the SFTP transport with homedir based relative paths."""
107
 
 
108
 
    def test__remote_path(self):
109
 
        t = self.get_transport()
110
 
        # This test require unix-like absolute path
111
 
        test_dir = self.test_dir
112
 
        if sys.platform == 'win32':
113
 
            # using hack suggested by John Meinel.
114
 
            # TODO: write another mock server for this test
115
 
            #       and use absolute path without drive letter
116
 
            test_dir = '/' + test_dir
117
 
        # try what is currently used:
118
 
        # remote path = self._abspath(relpath)
119
 
        self.assertEqual(test_dir + '/relative', t._remote_path('relative'))
120
 
        # we dont os.path.join because windows gives us the wrong path
121
 
        root_segments = test_dir.split('/')
122
 
        root_parent = '/'.join(root_segments[:-1])
123
 
        # .. should be honoured
124
 
        self.assertEqual(root_parent + '/sibling', t._remote_path('../sibling'))
125
 
        # /  should be illegal ?
126
 
        ### FIXME decide and then test for all transports. RBC20051208
127
 
 
128
 
 
129
 
class SFTPTransportTestRelativeRoot(TestCaseWithSFTPServer):
130
 
    """Test the SFTP transport with homedir based relative paths."""
131
 
 
132
 
    def setUp(self):
133
 
        self._get_remote_is_absolute = False
134
 
        super(SFTPTransportTestRelativeRoot, self).setUp()
135
 
 
136
 
    def test__remote_path_relative_root(self):
137
 
        # relative paths are preserved
138
 
        t = self.get_transport('')
139
 
        # the remote path should be ''
140
 
        self.assertEqual('', t._path)
141
 
        self.assertEqual('a', t._remote_path('a'))
 
183
        self.assertEquals(self.sftplogs, [])
142
184
 
143
185
 
144
186
class FakeSFTPTransport (object):
147
189
 
148
190
 
149
191
class SFTPNonServerTest(TestCase):
150
 
    def setUp(self):
151
 
        TestCase.setUp(self)
152
 
        if not paramiko_loaded:
153
 
            raise TestSkipped('you must have paramiko to run this test')
154
 
 
155
192
    def test_parse_url(self):
156
193
        from bzrlib.transport.sftp import SFTPTransport
157
 
        s = SFTPTransport('sftp://simple.example.com/home/source', clone_from=fake)
 
194
        s = SFTPTransport('sftp://simple.example.com/%2fhome/source', clone_from=fake)
158
195
        self.assertEquals(s._host, 'simple.example.com')
159
196
        self.assertEquals(s._port, None)
160
197
        self.assertEquals(s._path, '/home/source')
161
198
        self.failUnless(s._password is None)
162
199
 
163
 
        self.assertEquals(s.base, 'sftp://simple.example.com/home/source/')
164
 
 
165
 
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/~/relative', clone_from=fake)
 
200
        self.assertEquals(s.base, 'sftp://simple.example.com/%2Fhome/source')
 
201
        
 
202
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
166
203
        self.assertEquals(s._host, 'example.com')
167
204
        self.assertEquals(s._port, 2222)
168
205
        self.assertEquals(s._username, 'robey')
170
207
        self.assertEquals(s._path, 'relative')
171
208
 
172
209
        # Base should not keep track of the password
173
 
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/~/relative/')
 
210
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/relative')
 
211
 
 
212
        # Double slash should be accepted instead of using %2F
 
213
        s = SFTPTransport('sftp://user@example.com:22//absolute/path', clone_from=fake)
 
214
        self.assertEquals(s._host, 'example.com')
 
215
        self.assertEquals(s._port, 22)
 
216
        self.assertEquals(s._username, 'user')
 
217
        self.assertEquals(s._password, None)
 
218
        self.assertEquals(s._path, '/absolute/path')
 
219
 
 
220
        # Also, don't show the port if it is the default 22
 
221
        self.assertEquals(s.base, 'sftp://user@example.com:22/%2Fabsolute/path')
174
222
 
175
223
    def test_relpath(self):
176
224
        from bzrlib.transport.sftp import SFTPTransport
177
225
        from bzrlib.errors import PathNotChild
178
226
 
179
 
        s = SFTPTransport('sftp://user@host.com/abs/path', clone_from=fake)
180
 
        self.assertEquals(s.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
227
        s = SFTPTransport('sftp://user@host.com//abs/path', clone_from=fake)
 
228
        self.assertEquals(s.relpath('sftp://user@host.com//abs/path/sub'), 'sub')
181
229
        # Can't test this one, because we actually get an AssertionError
182
230
        # TODO: Consider raising an exception rather than an assert
183
 
        #self.assertRaises(PathNotChild, s.relpath, 'http://user@host.com/abs/path/sub')
184
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user2@host.com/abs/path/sub')
185
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@otherhost.com/abs/path/sub')
186
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com:33/abs/path/sub')
187
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com/~/rel/path/sub')
 
231
        #self.assertRaises(PathNotChild, s.relpath, 'http://user@host.com//abs/path/sub')
 
232
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user2@host.com//abs/path/sub')
 
233
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@otherhost.com//abs/path/sub')
 
234
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com:33//abs/path/sub')
 
235
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com/abs/path/sub')
188
236
 
189
237
        # Make sure it works when we don't supply a username
190
 
        s = SFTPTransport('sftp://host.com/abs/path', clone_from=fake)
191
 
        self.assertEquals(s.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
238
        s = SFTPTransport('sftp://host.com//abs/path', clone_from=fake)
 
239
        self.assertEquals(s.relpath('sftp://host.com//abs/path/sub'), 'sub')
192
240
 
193
241
        # Make sure it works when parts of the path will be url encoded
194
242
        # TODO: These may be incorrect, we might need to urllib.urlencode() before
205
253
                              clone_from=fake)
206
254
            self.fail('expected exception not raised')
207
255
        except TransportError, e:
208
 
            self.assertEquals(str(e),
209
 
                    'Transport error: '
210
 
                    'invalid port number ~janneke in url:\n'
211
 
                    'sftp://lilypond.org:~janneke/public_html/bzr/gub ')
212
 
 
213
 
    def test_get_paramiko_vendor(self):
214
 
        """Test that if no 'ssh' is available we get builtin paramiko"""
215
 
        from bzrlib.transport import ssh
216
 
        # set '.' as the only location in the path, forcing no 'ssh' to exist
217
 
        orig_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
218
 
        orig_path = set_or_unset_env('PATH', '.')
219
 
        try:
220
 
            # No vendor defined yet, query for one
221
 
            ssh._ssh_vendor_manager.clear_cache()
222
 
            vendor = ssh._get_ssh_vendor()
223
 
            self.assertIsInstance(vendor, ssh.ParamikoVendor)
224
 
        finally:
225
 
            set_or_unset_env('PATH', orig_path)
226
 
            ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
227
 
 
228
 
    def test_abspath_root_sibling_server(self):
229
 
        from bzrlib.transport.sftp import SFTPSiblingAbsoluteServer
230
 
        server = SFTPSiblingAbsoluteServer()
231
 
        server.setUp()
232
 
        try:
233
 
            transport = get_transport(server.get_url())
234
 
            self.assertFalse(transport.abspath('/').endswith('/~/'))
235
 
            self.assertTrue(transport.abspath('/').endswith('/'))
236
 
            del transport
237
 
        finally:
238
 
            server.tearDown()
 
256
            self.assertEquals(str(e), 
 
257
                    '~janneke: invalid port number')
239
258
 
240
259
 
241
260
class SFTPBranchTest(TestCaseWithSFTPServer):
242
261
    """Test some stuff when accessing a bzr Branch over sftp"""
243
262
 
244
263
    def test_lock_file(self):
245
 
        # old format branches use a special lock file on sftp.
246
 
        b = self.make_branch('', format=bzrdir.BzrDirFormat6())
247
 
        b = bzrlib.branch.Branch.open(self.get_url())
 
264
        """Make sure that a Branch accessed over sftp tries to lock itself."""
 
265
        from bzrlib.branch import Branch
 
266
 
 
267
        self.delayed_setup()
 
268
        b = Branch.initialize(self._sftp_url)
248
269
        self.failUnlessExists('.bzr/')
249
270
        self.failUnlessExists('.bzr/branch-format')
250
271
        self.failUnlessExists('.bzr/branch-lock')
251
272
 
252
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
 
273
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
253
274
        b.lock_write()
254
275
        self.failUnlessExists('.bzr/branch-lock.write-lock')
255
276
        b.unlock()
256
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
 
277
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
 
278
 
 
279
    def test_no_working_tree(self):
 
280
        from bzrlib.branch import Branch
 
281
        self.delayed_setup()
 
282
        b = Branch.initialize(self._sftp_url)
 
283
        self.assertRaises(errors.NoWorkingTree, b.working_tree)
257
284
 
258
285
    def test_push_support(self):
 
286
        from bzrlib.branch import Branch
 
287
        self.delayed_setup()
 
288
 
259
289
        self.build_tree(['a/', 'a/foo'])
260
 
        t = bzrdir.BzrDir.create_standalone_workingtree('a')
261
 
        b = t.branch
 
290
        b = Branch.initialize('a')
 
291
        t = b.working_tree()
262
292
        t.add('foo')
263
293
        t.commit('foo', rev_id='a1')
264
294
 
265
 
        b2 = bzrdir.BzrDir.create_branch_and_repo(self.get_url('/b'))
 
295
        os.mkdir('b')
 
296
        b2 = Branch.initialize(self._sftp_url + 'b')
266
297
        b2.pull(b)
267
298
 
268
299
        self.assertEquals(b2.revision_history(), ['a1'])
269
300
 
270
 
        open('a/foo', 'wt').write('something new in foo\n')
271
 
        t.commit('new', rev_id='a2')
272
 
        b2.pull(b)
273
 
 
274
 
        self.assertEquals(b2.revision_history(), ['a1', 'a2'])
275
 
 
276
 
 
277
 
class SSHVendorConnection(TestCaseWithSFTPServer):
278
 
    """Test that the ssh vendors can all connect.
279
 
 
280
 
    Verify that a full-handshake (SSH over loopback TCP) sftp connection works.
281
 
 
282
 
    We have 3 sftp implementations in the test suite:
283
 
      'loopback': Doesn't use ssh, just uses a local socket. Most tests are
284
 
                  done this way to save the handshaking time, so it is not
285
 
                  tested again here
286
 
      'none':     This uses paramiko's built-in ssh client and server, and layers
287
 
                  sftp on top of it.
288
 
      None:       If 'ssh' exists on the machine, then it will be spawned as a
289
 
                  child process.
290
 
    """
291
 
    
292
 
    def setUp(self):
293
 
        super(SSHVendorConnection, self).setUp()
294
 
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
295
 
 
296
 
        def create_server():
297
 
            """Just a wrapper so that when created, it will set _vendor"""
298
 
            # SFTPFullAbsoluteServer can handle any vendor,
299
 
            # it just needs to be set between the time it is instantiated
300
 
            # and the time .setUp() is called
301
 
            server = SFTPFullAbsoluteServer()
302
 
            server._vendor = self._test_vendor
303
 
            return server
304
 
        self._test_vendor = 'loopback'
305
 
        self.vfs_transport_server = create_server
306
 
        f = open('a_file', 'wb')
307
 
        try:
308
 
            f.write('foobar\n')
309
 
        finally:
310
 
            f.close()
311
 
 
312
 
    def set_vendor(self, vendor):
313
 
        self._test_vendor = vendor
314
 
 
315
 
    def test_connection_paramiko(self):
316
 
        from bzrlib.transport import ssh
317
 
        self.set_vendor(ssh.ParamikoVendor())
318
 
        t = self.get_transport()
319
 
        self.assertEqual('foobar\n', t.get('a_file').read())
320
 
 
321
 
    def test_connection_vendor(self):
322
 
        raise TestSkipped("We don't test spawning real ssh,"
323
 
                          " because it prompts for a password."
324
 
                          " Enable this test if we figure out"
325
 
                          " how to prevent this.")
326
 
        self.set_vendor(None)
327
 
        t = self.get_transport()
328
 
        self.assertEqual('foobar\n', t.get('a_file').read())
329
 
 
330
 
 
331
 
class SSHVendorBadConnection(TestCaseWithTransport):
332
 
    """Test that the ssh vendors handle bad connection properly
333
 
 
334
 
    We don't subclass TestCaseWithSFTPServer, because we don't actually
335
 
    need an SFTP connection.
336
 
    """
337
 
 
338
 
    def setUp(self):
339
 
        if not paramiko_loaded:
340
 
            raise TestSkipped('you must have paramiko to run this test')
341
 
        super(SSHVendorBadConnection, self).setUp()
342
 
        import bzrlib.transport.ssh
343
 
 
344
 
        # open a random port, so we know nobody else is using it
345
 
        # but don't actually listen on the port.
346
 
        s = socket.socket()
347
 
        s.bind(('localhost', 0))
348
 
        self.bogus_url = 'sftp://%s:%s/' % s.getsockname()
349
 
 
350
 
        orig_vendor = bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor
351
 
        def reset():
352
 
            bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
353
 
            s.close()
354
 
        self.addCleanup(reset)
355
 
 
356
 
    def set_vendor(self, vendor):
357
 
        import bzrlib.transport.ssh
358
 
        bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = vendor
359
 
 
360
 
    def test_bad_connection_paramiko(self):
361
 
        """Test that a real connection attempt raises the right error"""
362
 
        from bzrlib.transport import ssh
363
 
        self.set_vendor(ssh.ParamikoVendor())
364
 
        self.assertRaises(errors.ConnectionError,
365
 
                          bzrlib.transport.get_transport, self.bogus_url)
366
 
 
367
 
    def test_bad_connection_ssh(self):
368
 
        """None => auto-detect vendor"""
369
 
        self.set_vendor(None)
370
 
        # This is how I would normally test the connection code
371
 
        # it makes it very clear what we are testing.
372
 
        # However, 'ssh' will create stipple on the output, so instead
373
 
        # I'm using run_bzr_subprocess, and parsing the output
374
 
        # try:
375
 
        #     t = bzrlib.transport.get_transport(self.bogus_url)
376
 
        # except errors.ConnectionError:
377
 
        #     # Correct error
378
 
        #     pass
379
 
        # except errors.NameError, e:
380
 
        #     if 'SSHException' in str(e):
381
 
        #         raise TestSkipped('Known NameError bug in paramiko 1.6.1')
382
 
        #     raise
383
 
        # else:
384
 
        #     self.fail('Excepted ConnectionError to be raised')
385
 
 
386
 
        out, err = self.run_bzr_subprocess('log', self.bogus_url, retcode=3)
387
 
        self.assertEqual('', out)
388
 
        if "NameError: global name 'SSHException'" in err:
389
 
            # We aren't fixing this bug, because it is a bug in
390
 
            # paramiko, but we know about it, so we don't have to
391
 
            # fail the test
392
 
            raise TestSkipped('Known NameError bug with paramiko-1.6.1')
393
 
        self.assertContainsRe(err, r'bzr: ERROR: Unable to connect to SSH host'
394
 
                                   r' 127\.0\.0\.1:\d+; ')
395
 
 
396
 
 
397
 
class SFTPLatencyKnob(TestCaseWithSFTPServer):
398
 
    """Test that the testing SFTPServer's latency knob works."""
399
 
 
400
 
    def test_latency_knob_slows_transport(self):
401
 
        # change the latency knob to 500ms. We take about 40ms for a 
402
 
        # loopback connection ordinarily.
403
 
        start_time = time.time()
404
 
        self.get_server().add_latency = 0.5
405
 
        transport = self.get_transport()
406
 
        with_latency_knob_time = time.time() - start_time
407
 
        self.assertTrue(with_latency_knob_time > 0.4)
408
 
 
409
 
    def test_default(self):
410
 
        # This test is potentially brittle: under extremely high machine load
411
 
        # it could fail, but that is quite unlikely
412
 
        start_time = time.time()
413
 
        transport = self.get_transport()
414
 
        regular_time = time.time() - start_time
415
 
        self.assertTrue(regular_time < 0.5)
416
 
 
417
 
 
418
 
class FakeSocket(object):
419
 
    """Fake socket object used to test the SocketDelay wrapper without
420
 
    using a real socket.
421
 
    """
422
 
 
423
 
    def __init__(self):
424
 
        self._data = ""
425
 
 
426
 
    def send(self, data, flags=0):
427
 
        self._data += data
428
 
        return len(data)
429
 
 
430
 
    def sendall(self, data, flags=0):
431
 
        self._data += data
432
 
        return len(data)
433
 
 
434
 
    def recv(self, size, flags=0):
435
 
        if size < len(self._data):
436
 
            result = self._data[:size]
437
 
            self._data = self._data[size:]
438
 
            return result
439
 
        else:
440
 
            result = self._data
441
 
            self._data = ""
442
 
            return result
443
 
 
444
 
 
445
 
class TestSocketDelay(TestCase):
446
 
 
447
 
    def setUp(self):
448
 
        TestCase.setUp(self)
449
 
        if not paramiko_loaded:
450
 
            raise TestSkipped('you must have paramiko to run this test')
451
 
 
452
 
    def test_delay(self):
453
 
        from bzrlib.transport.sftp import SocketDelay
454
 
        sending = FakeSocket()
455
 
        receiving = SocketDelay(sending, 0.1, bandwidth=1000000,
456
 
                                really_sleep=False)
457
 
        # check that simulated time is charged only per round-trip:
458
 
        t1 = SocketDelay.simulated_time
459
 
        receiving.send("connect1")
460
 
        self.assertEqual(sending.recv(1024), "connect1")
461
 
        t2 = SocketDelay.simulated_time
462
 
        self.assertAlmostEqual(t2 - t1, 0.1)
463
 
        receiving.send("connect2")
464
 
        self.assertEqual(sending.recv(1024), "connect2")
465
 
        sending.send("hello")
466
 
        self.assertEqual(receiving.recv(1024), "hello")
467
 
        t3 = SocketDelay.simulated_time
468
 
        self.assertAlmostEqual(t3 - t2, 0.1)
469
 
        sending.send("hello")
470
 
        self.assertEqual(receiving.recv(1024), "hello")
471
 
        sending.send("hello")
472
 
        self.assertEqual(receiving.recv(1024), "hello")
473
 
        sending.send("hello")
474
 
        self.assertEqual(receiving.recv(1024), "hello")
475
 
        t4 = SocketDelay.simulated_time
476
 
        self.assertAlmostEqual(t4, t3)
477
 
 
478
 
    def test_bandwidth(self):
479
 
        from bzrlib.transport.sftp import SocketDelay
480
 
        sending = FakeSocket()
481
 
        receiving = SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
482
 
                                really_sleep=False)
483
 
        # check that simulated time is charged only per round-trip:
484
 
        t1 = SocketDelay.simulated_time
485
 
        receiving.send("connect")
486
 
        self.assertEqual(sending.recv(1024), "connect")
487
 
        sending.send("a" * 100)
488
 
        self.assertEqual(receiving.recv(1024), "a" * 100)
489
 
        t2 = SocketDelay.simulated_time
490
 
        self.assertAlmostEqual(t2 - t1, 100 + 7)
491
 
 
492
 
 
 
301
 
 
302
if not paramiko_loaded:
 
303
    # TODO: Skip these
 
304
    del SFTPTransportTest
 
305
    del SFTPNonServerTest
 
306
    del SFTPBranchTest