~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp.py

  • Committer: Michael Ellerman
  • Date: 2005-12-10 22:11:13 UTC
  • mto: This revision was merged to the branch mainline in revision 1528.
  • Revision ID: michael@ellerman.id.au-20051210221113-99ca561aaab4661e
Simplify handling of DivergedBranches in cmd_pull()

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
import socket
19
19
import threading
20
20
 
21
 
from bzrlib.branch import Branch
 
21
from bzrlib.tests import TestCaseInTempDir, TestCase
 
22
from bzrlib.tests.test_transport import TestTransportMixIn
22
23
import bzrlib.errors as errors
23
 
from bzrlib.osutils import pathjoin, lexists
24
 
from bzrlib.tests import TestCaseInTempDir, TestCase, TestSkipped
25
 
import bzrlib.transport
26
24
 
27
25
try:
28
26
    import paramiko
 
27
    from stub_sftp import StubServer, StubSFTPServer
29
28
    paramiko_loaded = True
30
29
except ImportError:
31
30
    paramiko_loaded = False
32
31
 
33
 
 
34
 
class TestCaseWithSFTPServer(TestCaseInTempDir):
35
 
    """A test case base class that provides a sftp server on localhost."""
 
32
# XXX: 20051124 jamesh
 
33
# The tests currently pop up a password prompt when an external ssh
 
34
# is used.  This forces the use of the paramiko implementation.
 
35
if paramiko_loaded:
 
36
    import bzrlib.transport.sftp
 
37
    bzrlib.transport.sftp._ssh_vendor = 'none'
 
38
 
 
39
 
 
40
STUB_SERVER_KEY = """
 
41
-----BEGIN RSA PRIVATE KEY-----
 
42
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
 
43
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
 
44
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
 
45
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
 
46
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
 
47
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
 
48
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
 
49
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
 
50
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
 
51
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
 
52
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
 
53
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
 
54
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
 
55
-----END RSA PRIVATE KEY-----
 
56
"""
 
57
    
 
58
 
 
59
class SingleListener (threading.Thread):
 
60
    def __init__(self, callback):
 
61
        threading.Thread.__init__(self)
 
62
        self._callback = callback
 
63
        self._socket = socket.socket()
 
64
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
65
        self._socket.bind(('localhost', 0))
 
66
        self._socket.listen(1)
 
67
        self.port = self._socket.getsockname()[1]
 
68
        self.stop_event = threading.Event()
 
69
 
 
70
    def run(self):
 
71
        s, _ = self._socket.accept()
 
72
        # now close the listen socket
 
73
        self._socket.close()
 
74
        self._callback(s, self.stop_event)
 
75
    
 
76
    def stop(self):
 
77
        self.stop_event.set()
 
78
        # We should consider waiting for the other thread
 
79
        # to stop, because otherwise we get spurious
 
80
        #   bzr: ERROR: Socket exception: Connection reset by peer (54)
 
81
        # because the test suite finishes before the thread has a chance
 
82
        # to close. (Especially when only running a few tests)
 
83
        
 
84
        
 
85
class TestCaseWithSFTPServer (TestCaseInTempDir):
 
86
    """
 
87
    Execute a test case with a stub SFTP server, serving files from the local
 
88
    filesystem over the loopback network.
 
89
    """
 
90
    
 
91
    def _run_server(self, s, stop_event):
 
92
        ssh_server = paramiko.Transport(s)
 
93
        key_file = os.path.join(self._root, 'test_rsa.key')
 
94
        file(key_file, 'w').write(STUB_SERVER_KEY)
 
95
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
96
        ssh_server.add_server_key(host_key)
 
97
        server = StubServer(self)
 
98
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
 
99
        event = threading.Event()
 
100
        ssh_server.start_server(event, server)
 
101
        event.wait(5.0)
 
102
        stop_event.wait(30.0)
36
103
 
37
104
    def setUp(self):
38
 
        if not paramiko_loaded:
39
 
            raise TestSkipped('you must have paramiko to run this test')
40
 
        super(TestCaseWithSFTPServer, self).setUp()
41
 
        from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer, SFTPServer
42
 
        if getattr(self, '_full_handshake', False):
43
 
            self.server = SFTPServer()
44
 
        else:
45
 
            self._full_handshake = False
46
 
            if getattr(self, '_get_remote_is_absolute', None) is None:
47
 
                self._get_remote_is_absolute = True
48
 
            if self._get_remote_is_absolute:
49
 
                self.server = SFTPAbsoluteServer()
50
 
            else:
51
 
                self.server = SFTPHomeDirServer()
52
 
        self.server.setUp()
53
 
        self.addCleanup(self.server.tearDown)
54
 
        if self._full_handshake:
55
 
            self._sftp_url = self.server._get_sftp_url("")
56
 
        else:
57
 
            self._sftp_url = self.server.get_url()
 
105
        TestCaseInTempDir.setUp(self)
58
106
        self._root = self.test_dir
59
 
        # Set to a string in setUp to give sftp server a new homedir.
60
 
        self._override_home = None
61
107
        self._is_setup = False
 
108
 
 
109
    def delayed_setup(self):
 
110
        # some tests are just stubs that call setUp and then immediately call
 
111
        # tearDwon.  so don't create the port listener until get_transport is
 
112
        # called and we know we're in an actual test.
 
113
        if self._is_setup:
 
114
            return
 
115
        self._listener = SingleListener(self._run_server)
 
116
        self._listener.setDaemon(True)
 
117
        self._listener.start()        
 
118
        self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
 
119
        self._is_setup = True
 
120
        
 
121
    def tearDown(self):
 
122
        try:
 
123
            self._listener.stop()
 
124
        except AttributeError:
 
125
            pass
 
126
        TestCaseInTempDir.tearDown(self)
 
127
 
 
128
        
 
129
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
 
130
    readonly = False
 
131
 
 
132
    def setUp(self):
 
133
        TestCaseWithSFTPServer.setUp(self)
62
134
        self.sftplogs = []
63
135
 
64
 
    def get_remote_url(self, relpath_to_test_root):
65
 
        # FIXME use urljoin ?
66
 
        return self._sftp_url + '/' + relpath_to_test_root
67
 
 
68
 
    def get_transport(self, path=None):
69
 
        """Return a transport relative to self._test_root."""
70
 
        from bzrlib.transport import get_transport
71
 
        transport = get_transport(self._sftp_url)
72
 
        if path is None:
73
 
            return transport
74
 
        else:
75
 
            return transport.clone(path)
76
 
 
77
 
 
78
 
class SFTPLockTests (TestCaseWithSFTPServer):
 
136
    def log(self, *args):
 
137
        """Override the default log to grab sftp server messages"""
 
138
        TestCaseWithSFTPServer.log(self, *args)
 
139
        if args and args[0].startswith('sftpserver'):
 
140
            self.sftplogs.append(args[0])
 
141
 
 
142
    def get_transport(self):
 
143
        self.delayed_setup()
 
144
        from bzrlib.transport.sftp import SFTPTransport
 
145
        url = self._sftp_url
 
146
        return SFTPTransport(url)
79
147
 
80
148
    def test_sftp_locks(self):
81
149
        from bzrlib.errors import LockError
89
157
        self.assertRaises(LockError, t.lock_write, 'bogus')
90
158
 
91
159
        l.unlock()
92
 
        self.failIf(lexists('bogus.write-lock'))
 
160
        self.failIf(os.path.lexists('bogus.write-lock'))
93
161
 
94
162
        open('something.write-lock', 'wb').write('fake lock\n')
95
163
        self.assertRaises(LockError, t.lock_write, 'something')
104
172
 
105
173
    def test_multiple_connections(self):
106
174
        t = self.get_transport()
107
 
        self.assertTrue('sftpserver - new connection' in self.server.logs)
108
 
        self.server.logs = []
 
175
        self.assertEquals(self.sftplogs, 
 
176
                ['sftpserver - authorizing: foo'
 
177
               , 'sftpserver - channel request: session, 1'])
 
178
        self.sftplogs = []
109
179
        # The second request should reuse the first connection
110
180
        # SingleListener only allows for a single connection,
111
181
        # So the next line fails unless the connection is reused
112
182
        t2 = self.get_transport()
113
 
        self.assertEquals(self.server.logs, [])
114
 
 
115
 
 
116
 
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
117
 
    """Test the SFTP transport with homedir based relative paths."""
118
 
 
119
 
    def test__remote_path(self):
120
 
        t = self.get_transport()
121
 
        # try what is currently used:
122
 
        # remote path = self._abspath(relpath)
123
 
        self.assertEqual(self._root + '/relative', t._remote_path('relative'))
124
 
        # we dont os.path.join because windows gives us the wrong path
125
 
        root_segments = self._root.split('/')
126
 
        root_parent = '/'.join(root_segments[:-1])
127
 
        # .. should be honoured
128
 
        self.assertEqual(root_parent + '/sibling', t._remote_path('../sibling'))
129
 
        # /  should be illegal ?
130
 
        ### FIXME decide and then test for all transports. RBC20051208
131
 
 
132
 
 
133
 
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
134
 
    """Test the SFTP transport with homedir based relative paths."""
135
 
 
136
 
    def setUp(self):
137
 
        self._get_remote_is_absolute = False
138
 
        super(SFTPTransportTestRelative, self).setUp()
139
 
 
140
 
    def test__remote_path_relative_root(self):
141
 
        # relative paths are preserved
142
 
        t = self.get_transport('')
143
 
        self.assertEqual('a', t._remote_path('a'))
 
183
        self.assertEquals(self.sftplogs, [])
144
184
 
145
185
 
146
186
class FakeSFTPTransport (object):
149
189
 
150
190
 
151
191
class SFTPNonServerTest(TestCase):
152
 
    def setUp(self):
153
 
        TestCase.setUp(self)
154
 
        if not paramiko_loaded:
155
 
            raise TestSkipped('you must have paramiko to run this test')
156
 
 
157
192
    def test_parse_url(self):
158
193
        from bzrlib.transport.sftp import SFTPTransport
159
 
        s = SFTPTransport('sftp://simple.example.com/home/source', clone_from=fake)
 
194
        s = SFTPTransport('sftp://simple.example.com/%2fhome/source', clone_from=fake)
160
195
        self.assertEquals(s._host, 'simple.example.com')
161
196
        self.assertEquals(s._port, None)
162
197
        self.assertEquals(s._path, '/home/source')
163
198
        self.failUnless(s._password is None)
164
199
 
165
 
        self.assertEquals(s.base, 'sftp://simple.example.com/home/source/')
166
 
 
167
 
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/~/relative', clone_from=fake)
 
200
        self.assertEquals(s.base, 'sftp://simple.example.com/%2Fhome/source')
 
201
        
 
202
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
168
203
        self.assertEquals(s._host, 'example.com')
169
204
        self.assertEquals(s._port, 2222)
170
205
        self.assertEquals(s._username, 'robey')
172
207
        self.assertEquals(s._path, 'relative')
173
208
 
174
209
        # Base should not keep track of the password
175
 
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/~/relative/')
 
210
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/relative')
 
211
 
 
212
        # Double slash should be accepted instead of using %2F
 
213
        s = SFTPTransport('sftp://user@example.com:22//absolute/path', clone_from=fake)
 
214
        self.assertEquals(s._host, 'example.com')
 
215
        self.assertEquals(s._port, 22)
 
216
        self.assertEquals(s._username, 'user')
 
217
        self.assertEquals(s._password, None)
 
218
        self.assertEquals(s._path, '/absolute/path')
 
219
 
 
220
        # Also, don't show the port if it is the default 22
 
221
        self.assertEquals(s.base, 'sftp://user@example.com:22/%2Fabsolute/path')
176
222
 
177
223
    def test_relpath(self):
178
224
        from bzrlib.transport.sftp import SFTPTransport
179
225
        from bzrlib.errors import PathNotChild
180
226
 
181
 
        s = SFTPTransport('sftp://user@host.com/abs/path', clone_from=fake)
182
 
        self.assertEquals(s.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
227
        s = SFTPTransport('sftp://user@host.com//abs/path', clone_from=fake)
 
228
        self.assertEquals(s.relpath('sftp://user@host.com//abs/path/sub'), 'sub')
183
229
        # Can't test this one, because we actually get an AssertionError
184
230
        # TODO: Consider raising an exception rather than an assert
185
 
        #self.assertRaises(PathNotChild, s.relpath, 'http://user@host.com/abs/path/sub')
186
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user2@host.com/abs/path/sub')
187
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@otherhost.com/abs/path/sub')
188
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com:33/abs/path/sub')
189
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com/~/rel/path/sub')
 
231
        #self.assertRaises(PathNotChild, s.relpath, 'http://user@host.com//abs/path/sub')
 
232
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user2@host.com//abs/path/sub')
 
233
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@otherhost.com//abs/path/sub')
 
234
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com:33//abs/path/sub')
 
235
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com/abs/path/sub')
190
236
 
191
237
        # Make sure it works when we don't supply a username
192
 
        s = SFTPTransport('sftp://host.com/abs/path', clone_from=fake)
193
 
        self.assertEquals(s.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
238
        s = SFTPTransport('sftp://host.com//abs/path', clone_from=fake)
 
239
        self.assertEquals(s.relpath('sftp://host.com//abs/path/sub'), 'sub')
194
240
 
195
241
        # Make sure it works when parts of the path will be url encoded
196
242
        # TODO: These may be incorrect, we might need to urllib.urlencode() before
216
262
 
217
263
    def test_lock_file(self):
218
264
        """Make sure that a Branch accessed over sftp tries to lock itself."""
 
265
        from bzrlib.branch import Branch
 
266
 
 
267
        self.delayed_setup()
219
268
        b = Branch.initialize(self._sftp_url)
220
269
        self.failUnlessExists('.bzr/')
221
270
        self.failUnlessExists('.bzr/branch-format')
222
271
        self.failUnlessExists('.bzr/branch-lock')
223
272
 
224
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
 
273
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
225
274
        b.lock_write()
226
275
        self.failUnlessExists('.bzr/branch-lock.write-lock')
227
276
        b.unlock()
228
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
 
277
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
229
278
 
230
279
    def test_no_working_tree(self):
 
280
        from bzrlib.branch import Branch
 
281
        self.delayed_setup()
231
282
        b = Branch.initialize(self._sftp_url)
232
283
        self.assertRaises(errors.NoWorkingTree, b.working_tree)
233
284
 
234
285
    def test_push_support(self):
 
286
        from bzrlib.branch import Branch
 
287
        self.delayed_setup()
 
288
 
235
289
        self.build_tree(['a/', 'a/foo'])
236
290
        b = Branch.initialize('a')
237
291
        t = b.working_tree()
239
293
        t.commit('foo', rev_id='a1')
240
294
 
241
295
        os.mkdir('b')
242
 
        b2 = Branch.initialize(self._sftp_url + '/b')
 
296
        b2 = Branch.initialize(self._sftp_url + 'b')
243
297
        b2.pull(b)
244
298
 
245
299
        self.assertEquals(b2.revision_history(), ['a1'])
246
300
 
247
 
        open('a/foo', 'wt').write('something new in foo\n')
248
 
        t.commit('new', rev_id='a2')
249
 
        b2.pull(b)
250
 
 
251
 
        self.assertEquals(b2.revision_history(), ['a1', 'a2'])
252
 
 
253
 
 
254
 
class SFTPFullHandshakingTest(TestCaseWithSFTPServer):
255
 
    """Verify that a full-handshake (SSH over loopback TCP) sftp connection works."""
256
 
    _full_handshake = True
257
 
    
258
 
    def test_connection(self):
259
 
        self.get_transport()
 
301
 
 
302
if not paramiko_loaded:
 
303
    # TODO: Skip these
 
304
    del SFTPTransportTest
 
305
    del SFTPNonServerTest
 
306
    del SFTPBranchTest