~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp.py

[patch] use unicode literals for all hardcoded paths (Alexander Belchenko)

> When you use flat string on Windows for base part of file names then all
> derived file names is always representing as flat string. On Linux/Cygwin as
> I can see in situations when path cannot be represented as flat string (or in
> ascii encoding?) it silently converted to unicode. As result we have
> different behaviour with non-ascii (non-english) file names.

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
import os
18
18
import socket
19
19
import threading
 
20
import unittest
20
21
 
21
 
from bzrlib.tests import TestCaseInTempDir, TestCase, TestSkipped
 
22
from bzrlib.tests import TestCaseInTempDir
22
23
from bzrlib.tests.test_transport import TestTransportMixIn
23
 
import bzrlib.errors as errors
24
 
from bzrlib.osutils import pathjoin, lexists
25
24
 
26
25
try:
27
26
    import paramiko
30
29
except ImportError:
31
30
    paramiko_loaded = False
32
31
 
33
 
# XXX: 20051124 jamesh
34
 
# The tests currently pop up a password prompt when an external ssh
35
 
# is used.  This forces the use of the paramiko implementation.
36
 
if paramiko_loaded:
37
 
    import bzrlib.transport.sftp
38
 
    bzrlib.transport.sftp._ssh_vendor = 'none'
39
 
 
40
32
 
41
33
STUB_SERVER_KEY = """
42
34
-----BEGIN RSA PRIVATE KEY-----
76
68
    
77
69
    def stop(self):
78
70
        self.stop_event.set()
79
 
        # We should consider waiting for the other thread
80
 
        # to stop, because otherwise we get spurious
81
 
        #   bzr: ERROR: Socket exception: Connection reset by peer (54)
82
 
        # because the test suite finishes before the thread has a chance
83
 
        # to close. (Especially when only running a few tests)
84
71
        
85
72
        
86
73
class TestCaseWithSFTPServer (TestCaseInTempDir):
91
78
    
92
79
    def _run_server(self, s, stop_event):
93
80
        ssh_server = paramiko.Transport(s)
94
 
        key_file = pathjoin(self._root, 'test_rsa.key')
 
81
        key_file = os.path.join(self._root, 'test_rsa.key')
95
82
        file(key_file, 'w').write(STUB_SERVER_KEY)
96
83
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
97
84
        ssh_server.add_server_key(host_key)
98
 
        server = StubServer(self)
 
85
        server = StubServer()
99
86
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
100
87
        event = threading.Event()
101
88
        ssh_server.start_server(event, server)
103
90
        stop_event.wait(30.0)
104
91
 
105
92
    def setUp(self):
106
 
        if not paramiko_loaded:
107
 
            raise TestSkipped('you must have paramiko to run this test')
108
93
        TestCaseInTempDir.setUp(self)
109
94
        self._root = self.test_dir
110
 
        self._is_setup = False
111
95
 
112
96
    def delayed_setup(self):
113
97
        # some tests are just stubs that call setUp and then immediately call
114
98
        # tearDwon.  so don't create the port listener until get_transport is
115
99
        # called and we know we're in an actual test.
116
 
        if self._is_setup:
117
 
            return
118
100
        self._listener = SingleListener(self._run_server)
119
101
        self._listener.setDaemon(True)
120
102
        self._listener.start()        
121
103
        self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
122
 
        self._is_setup = True
123
104
        
124
105
    def tearDown(self):
125
106
        try:
131
112
        
132
113
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
133
114
    readonly = False
134
 
 
135
 
    def setUp(self):
136
 
        TestCaseWithSFTPServer.setUp(self)
137
 
        self.sftplogs = []
138
 
 
139
 
    def log(self, *args):
140
 
        """Override the default log to grab sftp server messages"""
141
 
        TestCaseWithSFTPServer.log(self, *args)
142
 
        if args and args[0].startswith('sftpserver'):
143
 
            self.sftplogs.append(args[0])
 
115
    setup = True
144
116
 
145
117
    def get_transport(self):
146
 
        self.delayed_setup()
 
118
        if self.setup:
 
119
            self.delayed_setup()
 
120
            self.setup = False
147
121
        from bzrlib.transport.sftp import SFTPTransport
148
122
        url = self._sftp_url
149
123
        return SFTPTransport(url)
160
134
        self.assertRaises(LockError, t.lock_write, 'bogus')
161
135
 
162
136
        l.unlock()
163
 
        self.failIf(lexists('bogus.write-lock'))
 
137
        self.failIf(os.path.lexists('bogus.write-lock'))
164
138
 
165
139
        open('something.write-lock', 'wb').write('fake lock\n')
166
140
        self.assertRaises(LockError, t.lock_write, 'something')
173
147
        l.unlock()
174
148
        l2.unlock()
175
149
 
176
 
    def test_multiple_connections(self):
177
 
        t = self.get_transport()
178
 
        self.assertEquals(self.sftplogs, 
179
 
                ['sftpserver - authorizing: foo'
180
 
               , 'sftpserver - channel request: session, 1'])
181
 
        self.sftplogs = []
182
 
        # The second request should reuse the first connection
183
 
        # SingleListener only allows for a single connection,
184
 
        # So the next line fails unless the connection is reused
185
 
        t2 = self.get_transport()
186
 
        self.assertEquals(self.sftplogs, [])
187
 
 
188
150
 
189
151
class FakeSFTPTransport (object):
190
152
    _sftp = object()
191
153
fake = FakeSFTPTransport()
192
154
 
193
155
 
194
 
class SFTPNonServerTest(TestCase):
195
 
    def setUp(self):
196
 
        TestCase.setUp(self)
197
 
        if not paramiko_loaded:
198
 
            raise TestSkipped('you must have paramiko to run this test')
199
 
 
 
156
class SFTPNonServerTest(unittest.TestCase):
200
157
    def test_parse_url(self):
201
158
        from bzrlib.transport.sftp import SFTPTransport
202
159
        s = SFTPTransport('sftp://simple.example.com/%2fhome/source', clone_from=fake)
203
160
        self.assertEquals(s._host, 'simple.example.com')
204
 
        self.assertEquals(s._port, None)
 
161
        self.assertEquals(s._port, 22)
205
162
        self.assertEquals(s._path, '/home/source')
206
 
        self.failUnless(s._password is None)
207
 
 
208
 
        self.assertEquals(s.base, 'sftp://simple.example.com/%2Fhome/source')
 
163
        self.assert_(s._password is None)
209
164
        
210
165
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/relative', clone_from=fake)
211
166
        self.assertEquals(s._host, 'example.com')
214
169
        self.assertEquals(s._password, 'h@t')
215
170
        self.assertEquals(s._path, 'relative')
216
171
 
217
 
        # Base should not keep track of the password
218
 
        self.assertEquals(s.base, 'sftp://robey@example.com:2222/relative')
219
 
 
220
 
        # Double slash should be accepted instead of using %2F
221
 
        s = SFTPTransport('sftp://user@example.com:22//absolute/path', clone_from=fake)
222
 
        self.assertEquals(s._host, 'example.com')
223
 
        self.assertEquals(s._port, 22)
224
 
        self.assertEquals(s._username, 'user')
225
 
        self.assertEquals(s._password, None)
226
 
        self.assertEquals(s._path, '/absolute/path')
227
 
 
228
 
        # Also, don't show the port if it is the default 22
229
 
        self.assertEquals(s.base, 'sftp://user@example.com:22/%2Fabsolute/path')
230
 
 
231
 
    def test_relpath(self):
232
 
        from bzrlib.transport.sftp import SFTPTransport
233
 
        from bzrlib.errors import PathNotChild
234
 
 
235
 
        s = SFTPTransport('sftp://user@host.com//abs/path', clone_from=fake)
236
 
        self.assertEquals(s.relpath('sftp://user@host.com//abs/path/sub'), 'sub')
237
 
        # Can't test this one, because we actually get an AssertionError
238
 
        # TODO: Consider raising an exception rather than an assert
239
 
        #self.assertRaises(PathNotChild, s.relpath, 'http://user@host.com//abs/path/sub')
240
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user2@host.com//abs/path/sub')
241
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@otherhost.com//abs/path/sub')
242
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com:33//abs/path/sub')
243
 
        self.assertRaises(PathNotChild, s.relpath, 'sftp://user@host.com/abs/path/sub')
244
 
 
245
 
        # Make sure it works when we don't supply a username
246
 
        s = SFTPTransport('sftp://host.com//abs/path', clone_from=fake)
247
 
        self.assertEquals(s.relpath('sftp://host.com//abs/path/sub'), 'sub')
248
 
 
249
 
        # Make sure it works when parts of the path will be url encoded
250
 
        # TODO: These may be incorrect, we might need to urllib.urlencode() before
251
 
        # we pass the paths into the SFTPTransport constructor
252
 
        s = SFTPTransport('sftp://host.com/dev/,path', clone_from=fake)
253
 
        self.assertEquals(s.relpath('sftp://host.com/dev/,path/sub'), 'sub')
254
 
        s = SFTPTransport('sftp://host.com/dev/%path', clone_from=fake)
255
 
        self.assertEquals(s.relpath('sftp://host.com/dev/%path/sub'), 'sub')
256
 
 
257
172
    def test_parse_invalid_url(self):
258
 
        from bzrlib.transport.sftp import SFTPTransport, TransportError
 
173
        from bzrlib.transport.sftp import SFTPTransport, SFTPTransportError
259
174
        try:
260
175
            s = SFTPTransport('sftp://lilypond.org:~janneke/public_html/bzr/gub',
261
176
                              clone_from=fake)
262
177
            self.fail('expected exception not raised')
263
 
        except TransportError, e:
 
178
        except SFTPTransportError, e:
264
179
            self.assertEquals(str(e), 
265
180
                    '~janneke: invalid port number')
266
181
 
 
182
        
267
183
 
268
184
class SFTPBranchTest(TestCaseWithSFTPServer):
269
185
    """Test some stuff when accessing a bzr Branch over sftp"""
278
194
        self.failUnlessExists('.bzr/branch-format')
279
195
        self.failUnlessExists('.bzr/branch-lock')
280
196
 
281
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
 
197
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
282
198
        b.lock_write()
283
199
        self.failUnlessExists('.bzr/branch-lock.write-lock')
284
200
        b.unlock()
285
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
286
 
 
287
 
    def test_no_working_tree(self):
288
 
        from bzrlib.branch import Branch
289
 
        self.delayed_setup()
290
 
        b = Branch.initialize(self._sftp_url)
291
 
        self.assertRaises(errors.NoWorkingTree, b.working_tree)
292
 
 
293
 
    def test_push_support(self):
294
 
        from bzrlib.branch import Branch
295
 
        self.delayed_setup()
296
 
 
297
 
        self.build_tree(['a/', 'a/foo'])
298
 
        b = Branch.initialize('a')
299
 
        t = b.working_tree()
300
 
        t.add('foo')
301
 
        t.commit('foo', rev_id='a1')
302
 
 
303
 
        os.mkdir('b')
304
 
        b2 = Branch.initialize(self._sftp_url + 'b')
305
 
        b2.pull(b)
306
 
 
307
 
        self.assertEquals(b2.revision_history(), ['a1'])
308
 
 
309
 
        open('a/foo', 'wt').write('something new in foo\n')
310
 
        t.commit('new', rev_id='a2')
311
 
        b2.pull(b)
312
 
 
313
 
        self.assertEquals(b2.revision_history(), ['a1', 'a2'])
314
 
 
315
 
 
 
201
        self.failIf(os.path.lexists('.bzr/branch-lock.write-lock'))
 
202
 
 
203
 
 
204
if not paramiko_loaded:
 
205
    # TODO: Skip these
 
206
    del SFTPTransportTest
 
207
    del SFTPNonServerTest
 
208
    del SFTPBranchTest