~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp_transport.py

[merge] jam-integration 1503

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
import socket
19
19
import threading
20
20
 
21
 
from bzrlib.tests import TestCaseInTempDir, TestCase, TestSkipped
22
 
from bzrlib.tests.test_transport import TestTransportMixIn
 
21
from bzrlib.branch import Branch
23
22
import bzrlib.errors as errors
24
23
from bzrlib.osutils import pathjoin, lexists
 
24
from bzrlib.tests import TestCaseInTempDir, TestCase, TestSkipped
 
25
import bzrlib.transport
25
26
 
26
27
try:
27
28
    import paramiko
28
 
    from stub_sftp import StubServer, StubSFTPServer
29
29
    paramiko_loaded = True
30
30
except ImportError:
31
31
    paramiko_loaded = False
32
32
 
33
 
# XXX: 20051124 jamesh
34
 
# The tests currently pop up a password prompt when an external ssh
35
 
# is used.  This forces the use of the paramiko implementation.
36
 
if paramiko_loaded:
37
 
    import bzrlib.transport.sftp
38
 
    bzrlib.transport.sftp._ssh_vendor = 'none'
39
 
 
40
 
 
41
 
STUB_SERVER_KEY = """
42
 
-----BEGIN RSA PRIVATE KEY-----
43
 
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
44
 
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
45
 
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
46
 
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
47
 
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
48
 
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
49
 
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
50
 
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
51
 
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
52
 
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
53
 
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
54
 
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
55
 
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
56
 
-----END RSA PRIVATE KEY-----
57
 
"""
58
 
    
59
 
 
60
 
class SingleListener (threading.Thread):
61
 
    def __init__(self, callback):
62
 
        threading.Thread.__init__(self)
63
 
        self._callback = callback
64
 
        self._socket = socket.socket()
65
 
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
66
 
        self._socket.bind(('localhost', 0))
67
 
        self._socket.listen(1)
68
 
        self.port = self._socket.getsockname()[1]
69
 
        self.stop_event = threading.Event()
70
 
 
71
 
    def run(self):
72
 
        s, _ = self._socket.accept()
73
 
        # now close the listen socket
74
 
        self._socket.close()
75
 
        self._callback(s, self.stop_event)
76
 
    
77
 
    def stop(self):
78
 
        self.stop_event.set()
79
 
        # We should consider waiting for the other thread
80
 
        # to stop, because otherwise we get spurious
81
 
        #   bzr: ERROR: Socket exception: Connection reset by peer (54)
82
 
        # because the test suite finishes before the thread has a chance
83
 
        # to close. (Especially when only running a few tests)
84
 
        
85
 
        
86
 
class TestCaseWithSFTPServer (TestCaseInTempDir):
87
 
    """
88
 
    Execute a test case with a stub SFTP server, serving files from the local
89
 
    filesystem over the loopback network.
90
 
    """
91
 
    
92
 
    def _run_server(self, s, stop_event):
93
 
        ssh_server = paramiko.Transport(s)
94
 
        key_file = pathjoin(self._root, 'test_rsa.key')
95
 
        file(key_file, 'w').write(STUB_SERVER_KEY)
96
 
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
97
 
        ssh_server.add_server_key(host_key)
98
 
        server = StubServer(self)
99
 
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
100
 
        event = threading.Event()
101
 
        ssh_server.start_server(event, server)
102
 
        event.wait(5.0)
103
 
        stop_event.wait(30.0)
 
33
 
 
34
class TestCaseWithSFTPServer(TestCaseInTempDir):
 
35
    """A test case base class that provides a sftp server on localhost."""
104
36
 
105
37
    def setUp(self):
106
 
        TestCaseInTempDir.setUp(self)
107
38
        if not paramiko_loaded:
108
39
            raise TestSkipped('you must have paramiko to run this test')
 
40
        super(TestCaseWithSFTPServer, self).setUp()
 
41
        from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
 
42
        if getattr(self, '_get_remote_is_absolute', None) is None:
 
43
            self._get_remote_is_absolute = True
 
44
        if self._get_remote_is_absolute:
 
45
            self.server = SFTPAbsoluteServer()
 
46
        else:
 
47
            self.server = SFTPHomeDirServer()
 
48
        self.server.setUp()
 
49
        self.addCleanup(self.server.tearDown)
 
50
        self._sftp_url = self.server.get_url()
109
51
        self._root = self.test_dir
 
52
        # Set to a string in setUp to give sftp server a new homedir.
 
53
        self._override_home = None
110
54
        self._is_setup = False
111
 
 
112
 
    def delayed_setup(self):
113
 
        # some tests are just stubs that call setUp and then immediately call
114
 
        # tearDwon.  so don't create the port listener until get_transport is
115
 
        # called and we know we're in an actual test.
116
 
        if self._is_setup:
117
 
            return
118
 
        self._listener = SingleListener(self._run_server)
119
 
        self._listener.setDaemon(True)
120
 
        self._listener.start()        
121
 
        self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
122
 
        self._is_setup = True
123
 
        
124
 
    def tearDown(self):
125
 
        try:
126
 
            self._listener.stop()
127
 
        except AttributeError:
128
 
            pass
129
 
        TestCaseInTempDir.tearDown(self)
130
 
 
131
 
        
132
 
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
133
 
    readonly = False
134
 
 
135
 
    def setUp(self):
136
 
        TestCaseWithSFTPServer.setUp(self)
137
55
        self.sftplogs = []
138
56
 
139
 
    def log(self, *args):
140
 
        """Override the default log to grab sftp server messages"""
141
 
        TestCaseWithSFTPServer.log(self, *args)
142
 
        if args and args[0].startswith('sftpserver'):
143
 
            self.sftplogs.append(args[0])
144
 
 
145
 
    def get_transport(self):
146
 
        self.delayed_setup()
147
 
        from bzrlib.transport.sftp import SFTPTransport
148
 
        url = self._sftp_url
149
 
        return SFTPTransport(url)
 
57
    def get_remote_url(self, relpath_to_test_root):
 
58
        # FIXME use urljoin ?
 
59
        return self._sftp_url + '/' + relpath_to_test_root
 
60
 
 
61
    def get_transport(self, path=None):
 
62
        """Return a transport relative to self._test_root."""
 
63
        from bzrlib.transport import get_transport
 
64
        transport = get_transport(self._sftp_url)
 
65
        if path is None:
 
66
            return transport
 
67
        else:
 
68
            return transport.clone(path)
 
69
 
 
70
 
 
71
class SFTPLockTests (TestCaseWithSFTPServer):
150
72
 
151
73
    def test_sftp_locks(self):
152
74
        from bzrlib.errors import LockError
175
97
 
176
98
    def test_multiple_connections(self):
177
99
        t = self.get_transport()
178
 
        self.assertEquals(self.sftplogs, 
 
100
        self.assertEquals(self.server.logs, 
179
101
                ['sftpserver - authorizing: foo'
180
102
               , 'sftpserver - channel request: session, 1'])
181
 
        self.sftplogs = []
 
103
        self.server.logs = []
182
104
        # The second request should reuse the first connection
183
105
        # SingleListener only allows for a single connection,
184
106
        # So the next line fails unless the connection is reused
185
107
        t2 = self.get_transport()
186
 
        self.assertEquals(self.sftplogs, [])
 
108
        self.assertEquals(self.server.logs, [])
 
109
 
 
110
 
 
111
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
 
112
    """Test the SFTP transport with homedir based relative paths."""
 
113
 
 
114
    def test__remote_path(self):
 
115
        t = self.get_transport()
 
116
        # try what is currently used:
 
117
        # remote path = self._abspath(relpath)
 
118
        self.assertEqual(self._root + '/relative', t._remote_path('relative'))
 
119
        # we dont os.path.join because windows gives us the wrong path
 
120
        root_segments = self._root.split('/')
 
121
        root_parent = '/'.join(root_segments[:-1])
 
122
        # .. should be honoured
 
123
        self.assertEqual(root_parent + '/sibling', t._remote_path('../sibling'))
 
124
        # /  should be illegal ?
 
125
        ### FIXME decide and then test for all transports. RBC20051208
 
126
 
 
127
 
 
128
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
 
129
    """Test the SFTP transport with homedir based relative paths."""
 
130
 
 
131
    def setUp(self):
 
132
        self._get_remote_is_absolute = False
 
133
        super(SFTPTransportTestRelative, self).setUp()
 
134
 
 
135
    def test__remote_path_relative_root(self):
 
136
        # relative paths are preserved
 
137
        t = self.get_transport('')
 
138
        self.assertEqual('a', t._remote_path('a'))
187
139
 
188
140
 
189
141
class FakeSFTPTransport (object):
205
157
        self.assertEquals(s._path, '/home/source')
206
158
        self.failUnless(s._password is None)
207
159
 
208
 
        self.assertEquals(s.base, 'sftp://simple.example.com/%2Fhome/source')
 
160
        self.assertEquals(s.base, 'sftp://simple.example.com/%2Fhome/source/')
209
161
        
210
162
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
211
163
        self.assertEquals(s._host, 'example.com')
215
167
        self.assertEquals(s._path, 'relative')
216
168
 
217
169
        # Base should not keep track of the password
218
 
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/relative')
 
170
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/relative/')
219
171
 
220
172
        # Double slash should be accepted instead of using %2F
221
 
        s = SFTPTransport('sftp://user@example.com:22//absolute/path', clone_from=fake)
 
173
        s = SFTPTransport('sftp://user@example.com:22//absolute/path/', clone_from=fake)
222
174
        self.assertEquals(s._host, 'example.com')
223
175
        self.assertEquals(s._port, 22)
224
176
        self.assertEquals(s._username, 'user')
225
177
        self.assertEquals(s._password, None)
226
 
        self.assertEquals(s._path, '/absolute/path')
 
178
        self.assertEquals(s._path, '/absolute/path/')
227
179
 
228
180
        # Also, don't show the port if it is the default 22
229
 
        self.assertEquals(s.base, 'sftp://user@example.com:22/%2Fabsolute/path')
 
181
        self.assertEquals(s.base, 'sftp://user@example.com:22/%2Fabsolute/path/')
230
182
 
231
183
    def test_relpath(self):
232
184
        from bzrlib.transport.sftp import SFTPTransport
270
222
 
271
223
    def test_lock_file(self):
272
224
        """Make sure that a Branch accessed over sftp tries to lock itself."""
273
 
        from bzrlib.branch import Branch
274
 
 
275
 
        self.delayed_setup()
276
225
        b = Branch.initialize(self._sftp_url)
277
226
        self.failUnlessExists('.bzr/')
278
227
        self.failUnlessExists('.bzr/branch-format')
285
234
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
286
235
 
287
236
    def test_no_working_tree(self):
288
 
        from bzrlib.branch import Branch
289
 
        self.delayed_setup()
290
237
        b = Branch.initialize(self._sftp_url)
291
238
        self.assertRaises(errors.NoWorkingTree, b.working_tree)
292
239
 
293
240
    def test_push_support(self):
294
 
        from bzrlib.branch import Branch
295
 
        self.delayed_setup()
296
 
 
297
241
        self.build_tree(['a/', 'a/foo'])
298
242
        b = Branch.initialize('a')
299
243
        t = b.working_tree()
301
245
        t.commit('foo', rev_id='a1')
302
246
 
303
247
        os.mkdir('b')
304
 
        b2 = Branch.initialize(self._sftp_url + 'b')
 
248
        b2 = Branch.initialize(self._sftp_url + '/b')
305
249
        b2.pull(b)
306
250
 
307
251
        self.assertEquals(b2.revision_history(), ['a1'])