~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp_transport.py

[merge] update from bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
import socket
19
19
import threading
20
20
 
21
 
from bzrlib.tests import TestCaseInTempDir, TestCase, TestSkipped
22
 
from bzrlib.tests.test_transport import TestTransportMixIn
 
21
from bzrlib.branch import Branch
23
22
import bzrlib.errors as errors
24
23
from bzrlib.osutils import pathjoin, lexists
 
24
from bzrlib.tests import TestCaseInTempDir, TestCase, TestSkipped
 
25
import bzrlib.transport
 
26
from bzrlib.workingtree import WorkingTree
25
27
 
26
28
try:
27
29
    import paramiko
28
 
    from stub_sftp import StubServer, StubSFTPServer
29
30
    paramiko_loaded = True
30
31
except ImportError:
31
32
    paramiko_loaded = False
32
33
 
33
 
# XXX: 20051124 jamesh
34
 
# The tests currently pop up a password prompt when an external ssh
35
 
# is used.  This forces the use of the paramiko implementation.
36
 
if paramiko_loaded:
37
 
    import bzrlib.transport.sftp
38
 
    bzrlib.transport.sftp._ssh_vendor = 'none'
39
 
 
40
 
 
41
 
STUB_SERVER_KEY = """
42
 
-----BEGIN RSA PRIVATE KEY-----
43
 
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
44
 
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
45
 
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
46
 
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
47
 
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
48
 
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
49
 
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
50
 
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
51
 
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
52
 
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
53
 
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
54
 
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
55
 
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
56
 
-----END RSA PRIVATE KEY-----
57
 
"""
58
 
    
59
 
 
60
 
class SingleListener (threading.Thread):
61
 
    def __init__(self, callback):
62
 
        threading.Thread.__init__(self)
63
 
        self._callback = callback
64
 
        self._socket = socket.socket()
65
 
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
66
 
        self._socket.bind(('localhost', 0))
67
 
        self._socket.listen(1)
68
 
        self.port = self._socket.getsockname()[1]
69
 
        self.stop_event = threading.Event()
70
 
 
71
 
    def run(self):
72
 
        s, _ = self._socket.accept()
73
 
        # now close the listen socket
74
 
        self._socket.close()
75
 
        self._callback(s, self.stop_event)
76
 
    
77
 
    def stop(self):
78
 
        self.stop_event.set()
79
 
        # We should consider waiting for the other thread
80
 
        # to stop, because otherwise we get spurious
81
 
        #   bzr: ERROR: Socket exception: Connection reset by peer (54)
82
 
        # because the test suite finishes before the thread has a chance
83
 
        # to close. (Especially when only running a few tests)
84
 
        
85
 
        
86
 
class TestCaseWithSFTPServer (TestCaseInTempDir):
87
 
    """
88
 
    Execute a test case with a stub SFTP server, serving files from the local
89
 
    filesystem over the loopback network.
90
 
    """
91
 
    
92
 
    def _run_server(self, s, stop_event):
93
 
        ssh_server = paramiko.Transport(s)
94
 
        key_file = pathjoin(self._root, 'test_rsa.key')
95
 
        file(key_file, 'w').write(STUB_SERVER_KEY)
96
 
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
97
 
        ssh_server.add_server_key(host_key)
98
 
        server = StubServer(self)
99
 
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
100
 
        event = threading.Event()
101
 
        ssh_server.start_server(event, server)
102
 
        event.wait(5.0)
103
 
        stop_event.wait(30.0)
 
34
 
 
35
class TestCaseWithSFTPServer(TestCaseInTempDir):
 
36
    """A test case base class that provides a sftp server on localhost."""
104
37
 
105
38
    def setUp(self):
106
39
        if not paramiko_loaded:
107
40
            raise TestSkipped('you must have paramiko to run this test')
108
 
        TestCaseInTempDir.setUp(self)
 
41
        super(TestCaseWithSFTPServer, self).setUp()
 
42
        from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer, SFTPServer
 
43
        if getattr(self, '_full_handshake', False):
 
44
            self.server = SFTPServer()
 
45
        else:
 
46
            self._full_handshake = False
 
47
            if getattr(self, '_get_remote_is_absolute', None) is None:
 
48
                self._get_remote_is_absolute = True
 
49
            if self._get_remote_is_absolute:
 
50
                self.server = SFTPAbsoluteServer()
 
51
            else:
 
52
                self.server = SFTPHomeDirServer()
 
53
        self.server.setUp()
 
54
        self.addCleanup(self.server.tearDown)
 
55
        if self._full_handshake:
 
56
            self._sftp_url = self.server._get_sftp_url("")
 
57
        else:
 
58
            self._sftp_url = self.server.get_url()
109
59
        self._root = self.test_dir
 
60
        # Set to a string in setUp to give sftp server a new homedir.
 
61
        self._override_home = None
110
62
        self._is_setup = False
111
 
 
112
 
    def delayed_setup(self):
113
 
        # some tests are just stubs that call setUp and then immediately call
114
 
        # tearDwon.  so don't create the port listener until get_transport is
115
 
        # called and we know we're in an actual test.
116
 
        if self._is_setup:
117
 
            return
118
 
        self._listener = SingleListener(self._run_server)
119
 
        self._listener.setDaemon(True)
120
 
        self._listener.start()        
121
 
        self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
122
 
        self._is_setup = True
123
 
        
124
 
    def tearDown(self):
125
 
        try:
126
 
            self._listener.stop()
127
 
        except AttributeError:
128
 
            pass
129
 
        TestCaseInTempDir.tearDown(self)
130
 
 
131
 
        
132
 
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
133
 
    readonly = False
134
 
 
135
 
    def setUp(self):
136
 
        TestCaseWithSFTPServer.setUp(self)
137
63
        self.sftplogs = []
138
64
 
139
 
    def log(self, *args):
140
 
        """Override the default log to grab sftp server messages"""
141
 
        TestCaseWithSFTPServer.log(self, *args)
142
 
        if args and args[0].startswith('sftpserver'):
143
 
            self.sftplogs.append(args[0])
144
 
 
145
 
    def get_transport(self):
146
 
        self.delayed_setup()
147
 
        from bzrlib.transport.sftp import SFTPTransport
148
 
        url = self._sftp_url
149
 
        return SFTPTransport(url)
 
65
    def get_remote_url(self, relpath_to_test_root):
 
66
        # FIXME use urljoin ?
 
67
        return self._sftp_url + '/' + relpath_to_test_root
 
68
 
 
69
    def get_transport(self, path=None):
 
70
        """Return a transport relative to self._test_root."""
 
71
        from bzrlib.transport import get_transport
 
72
        transport = get_transport(self._sftp_url)
 
73
        if path is None:
 
74
            return transport
 
75
        else:
 
76
            return transport.clone(path)
 
77
 
 
78
 
 
79
class SFTPLockTests (TestCaseWithSFTPServer):
150
80
 
151
81
    def test_sftp_locks(self):
152
82
        from bzrlib.errors import LockError
175
105
 
176
106
    def test_multiple_connections(self):
177
107
        t = self.get_transport()
178
 
        self.assertEquals(self.sftplogs, 
179
 
                ['sftpserver - authorizing: foo'
180
 
               , 'sftpserver - channel request: session, 1'])
181
 
        self.sftplogs = []
 
108
        self.assertTrue('sftpserver - new connection' in self.server.logs)
 
109
        self.server.logs = []
182
110
        # The second request should reuse the first connection
183
111
        # SingleListener only allows for a single connection,
184
112
        # So the next line fails unless the connection is reused
185
113
        t2 = self.get_transport()
186
 
        self.assertEquals(self.sftplogs, [])
 
114
        self.assertEquals(self.server.logs, [])
 
115
 
 
116
 
 
117
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
 
118
    """Test the SFTP transport with homedir based relative paths."""
 
119
 
 
120
    def test__remote_path(self):
 
121
        t = self.get_transport()
 
122
        # try what is currently used:
 
123
        # remote path = self._abspath(relpath)
 
124
        self.assertEqual(self._root + '/relative', t._remote_path('relative'))
 
125
        # we dont os.path.join because windows gives us the wrong path
 
126
        root_segments = self._root.split('/')
 
127
        root_parent = '/'.join(root_segments[:-1])
 
128
        # .. should be honoured
 
129
        self.assertEqual(root_parent + '/sibling', t._remote_path('../sibling'))
 
130
        # /  should be illegal ?
 
131
        ### FIXME decide and then test for all transports. RBC20051208
 
132
 
 
133
 
 
134
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
 
135
    """Test the SFTP transport with homedir based relative paths."""
 
136
 
 
137
    def setUp(self):
 
138
        self._get_remote_is_absolute = False
 
139
        super(SFTPTransportTestRelative, self).setUp()
 
140
 
 
141
    def test__remote_path_relative_root(self):
 
142
        # relative paths are preserved
 
143
        t = self.get_transport('')
 
144
        self.assertEqual('a', t._remote_path('a'))
187
145
 
188
146
 
189
147
class FakeSFTPTransport (object):
199
157
 
200
158
    def test_parse_url(self):
201
159
        from bzrlib.transport.sftp import SFTPTransport
202
 
        s = SFTPTransport('sftp://simple.example.com/%2fhome/source', clone_from=fake)
 
160
        s = SFTPTransport('sftp://simple.example.com/home/source', clone_from=fake)
203
161
        self.assertEquals(s._host, 'simple.example.com')
204
162
        self.assertEquals(s._port, None)
205
163
        self.assertEquals(s._path, '/home/source')
206
164
        self.failUnless(s._password is None)
207
165
 
208
 
        self.assertEquals(s.base, 'sftp://simple.example.com/%2Fhome/source')
209
 
        
210
 
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
 
166
        self.assertEquals(s.base, 'sftp://simple.example.com/home/source/')
 
167
 
 
168
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/~/relative', clone_from=fake)
211
169
        self.assertEquals(s._host, 'example.com')
212
170
        self.assertEquals(s._port, 2222)
213
171
        self.assertEquals(s._username, 'robey')
215
173
        self.assertEquals(s._path, 'relative')
216
174
 
217
175
        # Base should not keep track of the password
218
 
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/relative')
219
 
 
220
 
        # Double slash should be accepted instead of using %2F
221
 
        s = SFTPTransport('sftp://user@example.com:22//absolute/path', clone_from=fake)
222
 
        self.assertEquals(s._host, 'example.com')
223
 
        self.assertEquals(s._port, 22)
224
 
        self.assertEquals(s._username, 'user')
225
 
        self.assertEquals(s._password, None)
226
 
        self.assertEquals(s._path, '/absolute/path')
227
 
 
228
 
        # Also, don't show the port if it is the default 22
229
 
        self.assertEquals(s.base, 'sftp://user@example.com:22/%2Fabsolute/path')
 
176
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/~/relative/')
230
177
 
231
178
    def test_relpath(self):
232
179
        from bzrlib.transport.sftp import SFTPTransport
233
180
        from bzrlib.errors import PathNotChild
234
181
 
235
 
        s = SFTPTransport('sftp://user@host.com//abs/path', clone_from=fake)
236
 
        self.assertEquals(s.relpath('sftp://user@host.com//abs/path/sub'), 'sub')
 
182
        s = SFTPTransport('sftp://user@host.com/abs/path', clone_from=fake)
 
183
        self.assertEquals(s.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
237
184
        # Can't test this one, because we actually get an AssertionError
238
185
        # TODO: Consider raising an exception rather than an assert
239
 
        #self.assertRaises(PathNotChild, s.relpath, 'http://user@host.com//abs/path/sub')
240
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user2@host.com//abs/path/sub')
241
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@otherhost.com//abs/path/sub')
242
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com:33//abs/path/sub')
243
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com/abs/path/sub')
 
186
        #self.assertRaises(PathNotChild, s.relpath, 'http://user@host.com/abs/path/sub')
 
187
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user2@host.com/abs/path/sub')
 
188
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@otherhost.com/abs/path/sub')
 
189
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com:33/abs/path/sub')
 
190
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com/~/rel/path/sub')
244
191
 
245
192
        # Make sure it works when we don't supply a username
246
 
        s = SFTPTransport('sftp://host.com//abs/path', clone_from=fake)
247
 
        self.assertEquals(s.relpath('sftp://host.com//abs/path/sub'), 'sub')
 
193
        s = SFTPTransport('sftp://host.com/abs/path', clone_from=fake)
 
194
        self.assertEquals(s.relpath('sftp://host.com/abs/path/sub'), 'sub')
248
195
 
249
196
        # Make sure it works when parts of the path will be url encoded
250
197
        # TODO: These may be incorrect, we might need to urllib.urlencode() before
270
217
 
271
218
    def test_lock_file(self):
272
219
        """Make sure that a Branch accessed over sftp tries to lock itself."""
273
 
        from bzrlib.branch import Branch
274
 
 
275
 
        self.delayed_setup()
276
 
        b = Branch.initialize(self._sftp_url)
 
220
        b = Branch.create(self._sftp_url)
277
221
        self.failUnlessExists('.bzr/')
278
222
        self.failUnlessExists('.bzr/branch-format')
279
223
        self.failUnlessExists('.bzr/branch-lock')
285
229
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
286
230
 
287
231
    def test_no_working_tree(self):
288
 
        from bzrlib.branch import Branch
289
 
        self.delayed_setup()
290
 
        b = Branch.initialize(self._sftp_url)
 
232
        b = Branch.create(self._sftp_url)
291
233
        self.assertRaises(errors.NoWorkingTree, b.working_tree)
292
234
 
293
235
    def test_push_support(self):
294
 
        from bzrlib.branch import Branch
295
 
        self.delayed_setup()
296
 
 
297
236
        self.build_tree(['a/', 'a/foo'])
298
 
        b = Branch.initialize('a')
299
 
        t = b.working_tree()
 
237
        t = WorkingTree.create_standalone('a')
 
238
        b = t.branch
300
239
        t.add('foo')
301
240
        t.commit('foo', rev_id='a1')
302
241
 
303
242
        os.mkdir('b')
304
 
        b2 = Branch.initialize(self._sftp_url + 'b')
 
243
        b2 = Branch.create(self._sftp_url + '/b')
305
244
        b2.pull(b)
306
245
 
307
246
        self.assertEquals(b2.revision_history(), ['a1'])
313
252
        self.assertEquals(b2.revision_history(), ['a1', 'a2'])
314
253
 
315
254
 
 
255
class SFTPFullHandshakingTest(TestCaseWithSFTPServer):
 
256
    """Verify that a full-handshake (SSH over loopback TCP) sftp connection works."""
 
257
    _full_handshake = True
 
258
    
 
259
    def test_connection(self):
 
260
        self.get_transport()