~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testsftp.py

[merge] basic_io metaformat (mbp)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
import os
 
18
import socket
 
19
import threading
 
20
import unittest
 
21
 
 
22
from bzrlib.selftest import TestCaseInTempDir
 
23
from bzrlib.selftest.testtransport import TestTransportMixIn
 
24
 
 
25
try:
 
26
    import paramiko
 
27
    from stub_sftp import StubServer, StubSFTPServer
 
28
    paramiko_loaded = True
 
29
except ImportError:
 
30
    paramiko_loaded = False
 
31
 
 
32
 
 
33
STUB_SERVER_KEY = """
 
34
-----BEGIN RSA PRIVATE KEY-----
 
35
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
 
36
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
 
37
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
 
38
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
 
39
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
 
40
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
 
41
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
 
42
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
 
43
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
 
44
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
 
45
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
 
46
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
 
47
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
 
48
-----END RSA PRIVATE KEY-----
 
49
"""
 
50
    
 
51
 
 
52
class SingleListener (threading.Thread):
 
53
    def __init__(self, callback):
 
54
        threading.Thread.__init__(self)
 
55
        self._callback = callback
 
56
        self._socket = socket.socket()
 
57
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
58
        self._socket.bind(('localhost', 0))
 
59
        self._socket.listen(1)
 
60
        self.port = self._socket.getsockname()[1]
 
61
        self.stop_event = threading.Event()
 
62
 
 
63
    def run(self):
 
64
        s, _ = self._socket.accept()
 
65
        # now close the listen socket
 
66
        self._socket.close()
 
67
        self._callback(s, self.stop_event)
 
68
    
 
69
    def stop(self):
 
70
        self.stop_event.set()
 
71
        
 
72
        
 
73
class TestCaseWithSFTPServer (TestCaseInTempDir):
 
74
    """
 
75
    Execute a test case with a stub SFTP server, serving files from the local
 
76
    filesystem over the loopback network.
 
77
    """
 
78
    
 
79
    def _run_server(self, s, stop_event):
 
80
        ssh_server = paramiko.Transport(s)
 
81
        key_file = os.path.join(self._root, 'test_rsa.key')
 
82
        file(key_file, 'w').write(STUB_SERVER_KEY)
 
83
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
84
        ssh_server.add_server_key(host_key)
 
85
        server = StubServer()
 
86
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
 
87
        event = threading.Event()
 
88
        ssh_server.start_server(event, server)
 
89
        event.wait(5.0)
 
90
        stop_event.wait(30.0)
 
91
 
 
92
    def setUp(self):
 
93
        TestCaseInTempDir.setUp(self)
 
94
        self._root = self.test_dir
 
95
 
 
96
    def delayed_setup(self):
 
97
        # some tests are just stubs that call setUp and then immediately call
 
98
        # tearDwon.  so don't create the port listener until get_transport is
 
99
        # called and we know we're in an actual test.
 
100
        self._listener = SingleListener(self._run_server)
 
101
        self._listener.setDaemon(True)
 
102
        self._listener.start()        
 
103
        self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
 
104
        
 
105
    def tearDown(self):
 
106
        try:
 
107
            self._listener.stop()
 
108
        except AttributeError:
 
109
            pass
 
110
        TestCaseInTempDir.tearDown(self)
 
111
 
 
112
        
 
113
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
 
114
    readonly = False
 
115
    setup = True
 
116
 
 
117
    def get_transport(self):
 
118
        if self.setup:
 
119
            self.delayed_setup()
 
120
            self.setup = False
 
121
        from bzrlib.transport.sftp import SFTPTransport
 
122
        url = self._sftp_url
 
123
        return SFTPTransport(url)
 
124
 
 
125
    def test_sftp_locks(self):
 
126
        from bzrlib.errors import LockError
 
127
        t = self.get_transport()
 
128
 
 
129
        l = t.lock_write('bogus')
 
130
        self.failUnlessExists('bogus.write-lock')
 
131
 
 
132
        # Don't wait for the lock, locking an already locked
 
133
        # file should raise an assert
 
134
        self.assertRaises(LockError, t.lock_write, 'bogus')
 
135
 
 
136
        l.unlock()
 
137
        self.failIf(os.path.lexists('bogus.write-lock'))
 
138
 
 
139
        open('something.write-lock', 'wb').write('fake lock\n')
 
140
        self.assertRaises(LockError, t.lock_write, 'something')
 
141
        os.remove('something.write-lock')
 
142
 
 
143
        l = t.lock_write('something')
 
144
 
 
145
        l2 = t.lock_write('bogus')
 
146
 
 
147
        l.unlock()
 
148
        l2.unlock()
 
149
 
 
150
 
 
151
class FakeSFTPTransport (object):
 
152
    _sftp = object()
 
153
fake = FakeSFTPTransport()
 
154
 
 
155
 
 
156
class SFTPNonServerTest (unittest.TestCase):
 
157
    def test_parse_url(self):
 
158
        from bzrlib.transport.sftp import SFTPTransport
 
159
        s = SFTPTransport('sftp://simple.example.com/%2fhome/source', clone_from=fake)
 
160
        self.assertEquals(s._host, 'simple.example.com')
 
161
        self.assertEquals(s._port, 22)
 
162
        self.assertEquals(s._path, '/home/source')
 
163
        self.assert_(s._password is None)
 
164
        
 
165
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
 
166
        self.assertEquals(s._host, 'example.com')
 
167
        self.assertEquals(s._port, 2222)
 
168
        self.assertEquals(s._username, 'robey')
 
169
        self.assertEquals(s._password, 'h@t')
 
170
        self.assertEquals(s._path, 'relative')
 
171
        
 
172
 
 
173
class SFTPBranchTest(TestCaseWithSFTPServer):
 
174
    """Test some stuff when accessing a bzr Branch over sftp"""
 
175
 
 
176
    def test_lock_file(self):
 
177
        """Make sure that a Branch accessed over sftp tries to lock itself."""
 
178
        from bzrlib.branch import Branch
 
179
 
 
180
        self.delayed_setup()
 
181
        b = Branch.initialize(self._sftp_url)
 
182
        self.failUnlessExists('.bzr/')
 
183
        self.failUnlessExists('.bzr/branch-format')
 
184
        self.failUnlessExists('.bzr/branch-lock')
 
185
 
 
186
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
 
187
        b.lock_write()
 
188
        self.failUnlessExists('.bzr/branch-lock.write-lock')
 
189
        b.unlock()
 
190
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
 
191
 
 
192
 
 
193
if not paramiko_loaded:
 
194
    # TODO: Skip these
 
195
    del SFTPTransportTest
 
196
    del SFTPNonServerTest
 
197
    del SFTPBranchTest