~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp_transport.py

  • Committer: Patch Queue Manager
  • Date: 2016-02-01 19:56:05 UTC
  • mfrom: (6615.1.1 trunk)
  • Revision ID: pqm@pqm.ubuntu.com-20160201195605-o7rl92wf6uyum3fk
(vila) Open trunk again as 2.8b1 (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Robey Pointer <robey@lag.net>
 
1
# Copyright (C) 2005-2012, 2016 Robey Pointer <robey@lag.net>
2
2
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
3
#
4
4
# This program is free software; you can redistribute it and/or modify
18
18
import os
19
19
import socket
20
20
import sys
21
 
import threading
22
21
import time
23
22
 
24
23
from bzrlib import (
25
 
    bzrdir,
26
24
    config,
 
25
    controldir,
27
26
    errors,
28
27
    tests,
29
28
    transport as _mod_transport,
30
29
    ui,
31
30
    )
32
31
from bzrlib.osutils import (
33
 
    pathjoin,
34
32
    lexists,
35
 
    set_or_unset_env,
36
33
    )
37
34
from bzrlib.tests import (
38
35
    features,
47
44
    from bzrlib.transport import sftp as _mod_sftp
48
45
    from bzrlib.tests import stub_sftp
49
46
 
50
 
from bzrlib.workingtree import WorkingTree
51
 
 
52
47
 
53
48
def set_test_transport_to_sftp(testcase):
54
49
    """A helper to set transports on test case instances."""
77
72
        t = self.get_transport()
78
73
 
79
74
        l = t.lock_write('bogus')
80
 
        self.failUnlessExists('bogus.write-lock')
 
75
        self.assertPathExists('bogus.write-lock')
81
76
 
82
77
        # Don't wait for the lock, locking an already locked
83
78
        # file should raise an assert
84
79
        self.assertRaises(LockError, t.lock_write, 'bogus')
85
80
 
86
81
        l.unlock()
87
 
        self.failIf(lexists('bogus.write-lock'))
 
82
        self.assertFalse(lexists('bogus.write-lock'))
88
83
 
89
 
        open('something.write-lock', 'wb').write('fake lock\n')
 
84
        with open('something.write-lock', 'wb') as f: f.write('fake lock\n')
90
85
        self.assertRaises(LockError, t.lock_write, 'something')
91
86
        os.remove('something.write-lock')
92
87
 
145
140
    def test__remote_path_relative_root(self):
146
141
        # relative paths are preserved
147
142
        t = self.get_transport('')
148
 
        self.assertEqual('/~/', t._path)
 
143
        self.assertEqual('/~/', t._parsed_url.path)
149
144
        # the remote path should be relative to home dir
150
145
        # (i.e. not begining with a '/')
151
146
        self.assertEqual('a', t._remote_path('a'))
152
147
 
153
148
 
154
149
class SFTPNonServerTest(TestCase):
 
150
 
155
151
    def setUp(self):
156
 
        TestCase.setUp(self)
 
152
        super(SFTPNonServerTest, self).setUp()
157
153
        self.requireFeature(features.paramiko)
158
154
 
159
155
    def test_parse_url_with_home_dir(self):
160
156
        s = _mod_sftp.SFTPTransport(
161
157
            'sftp://ro%62ey:h%40t@example.com:2222/~/relative')
162
 
        self.assertEquals(s._host, 'example.com')
163
 
        self.assertEquals(s._port, 2222)
164
 
        self.assertEquals(s._user, 'robey')
165
 
        self.assertEquals(s._password, 'h@t')
166
 
        self.assertEquals(s._path, '/~/relative/')
 
158
        self.assertEqual(s._parsed_url.host, 'example.com')
 
159
        self.assertEqual(s._parsed_url.port, 2222)
 
160
        self.assertEqual(s._parsed_url.user, 'robey')
 
161
        self.assertEqual(s._parsed_url.password, 'h@t')
 
162
        self.assertEqual(s._parsed_url.path, '/~/relative/')
167
163
 
168
164
    def test_relpath(self):
169
165
        s = _mod_sftp.SFTPTransport('sftp://user@host.com/abs/path')
174
170
        """Test that if no 'ssh' is available we get builtin paramiko"""
175
171
        from bzrlib.transport import ssh
176
172
        # set '.' as the only location in the path, forcing no 'ssh' to exist
177
 
        orig_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
178
 
        orig_path = set_or_unset_env('PATH', '.')
179
 
        try:
180
 
            # No vendor defined yet, query for one
181
 
            ssh._ssh_vendor_manager.clear_cache()
182
 
            vendor = ssh._get_ssh_vendor()
183
 
            self.assertIsInstance(vendor, ssh.ParamikoVendor)
184
 
        finally:
185
 
            set_or_unset_env('PATH', orig_path)
186
 
            ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
 
173
        self.overrideAttr(ssh, '_ssh_vendor_manager')
 
174
        self.overrideEnv('PATH', '.')
 
175
        ssh._ssh_vendor_manager.clear_cache()
 
176
        vendor = ssh._get_ssh_vendor()
 
177
        self.assertIsInstance(vendor, ssh.ParamikoVendor)
187
178
 
188
179
    def test_abspath_root_sibling_server(self):
189
180
        server = stub_sftp.SFTPSiblingAbsoluteServer()
190
181
        server.start_server()
191
 
        try:
192
 
            transport = _mod_transport.get_transport(server.get_url())
193
 
            self.assertFalse(transport.abspath('/').endswith('/~/'))
194
 
            self.assertTrue(transport.abspath('/').endswith('/'))
195
 
            del transport
196
 
        finally:
197
 
            server.stop_server()
 
182
        self.addCleanup(server.stop_server)
 
183
 
 
184
        transport = _mod_transport.get_transport_from_url(server.get_url())
 
185
        self.assertFalse(transport.abspath('/').endswith('/~/'))
 
186
        self.assertTrue(transport.abspath('/').endswith('/'))
 
187
        del transport
198
188
 
199
189
 
200
190
class SFTPBranchTest(TestCaseWithSFTPServer):
201
191
    """Test some stuff when accessing a bzr Branch over sftp"""
202
192
 
203
 
    def test_lock_file(self):
204
 
        # old format branches use a special lock file on sftp.
205
 
        b = self.make_branch('', format=bzrdir.BzrDirFormat6())
206
 
        b = bzrlib.branch.Branch.open(self.get_url())
207
 
        self.failUnlessExists('.bzr/')
208
 
        self.failUnlessExists('.bzr/branch-format')
209
 
        self.failUnlessExists('.bzr/branch-lock')
210
 
 
211
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
212
 
        b.lock_write()
213
 
        self.failUnlessExists('.bzr/branch-lock.write-lock')
214
 
        b.unlock()
215
 
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
216
 
 
217
193
    def test_push_support(self):
218
194
        self.build_tree(['a/', 'a/foo'])
219
 
        t = bzrdir.BzrDir.create_standalone_workingtree('a')
 
195
        t = controldir.ControlDir.create_standalone_workingtree('a')
220
196
        b = t.branch
221
197
        t.add('foo')
222
198
        t.commit('foo', rev_id='a1')
223
199
 
224
 
        b2 = bzrdir.BzrDir.create_branch_and_repo(self.get_url('/b'))
 
200
        b2 = controldir.ControlDir.create_branch_and_repo(self.get_url('/b'))
225
201
        b2.pull(b)
226
202
 
227
 
        self.assertEquals(b2.revision_history(), ['a1'])
 
203
        self.assertEqual(b2.last_revision(), 'a1')
228
204
 
229
 
        open('a/foo', 'wt').write('something new in foo\n')
 
205
        with open('a/foo', 'wt') as f: f.write('something new in foo\n')
230
206
        t.commit('new', rev_id='a2')
231
207
        b2.pull(b)
232
208
 
233
 
        self.assertEquals(b2.revision_history(), ['a1', 'a2'])
 
209
        self.assertEqual(b2.last_revision(), 'a2')
234
210
 
235
211
 
236
212
class SSHVendorConnection(TestCaseWithSFTPServer):
304
280
        self.addCleanup(s.close)
305
281
        self.bogus_url = 'sftp://%s:%s/' % s.getsockname()
306
282
 
307
 
    def set_vendor(self, vendor):
 
283
    def set_vendor(self, vendor, subprocess_stderr=None):
308
284
        from bzrlib.transport import ssh
309
285
        self.overrideAttr(ssh._ssh_vendor_manager, '_cached_ssh_vendor', vendor)
 
286
        if subprocess_stderr is not None:
 
287
            self.overrideAttr(ssh.SubprocessVendor, "_stderr_target",
 
288
                subprocess_stderr)
310
289
 
311
290
    def test_bad_connection_paramiko(self):
312
291
        """Test that a real connection attempt raises the right error"""
313
292
        from bzrlib.transport import ssh
314
293
        self.set_vendor(ssh.ParamikoVendor())
315
 
        t = _mod_transport.get_transport(self.bogus_url)
 
294
        t = _mod_transport.get_transport_from_url(self.bogus_url)
316
295
        self.assertRaises(errors.ConnectionError, t.get, 'foobar')
317
296
 
318
297
    def test_bad_connection_ssh(self):
319
298
        """None => auto-detect vendor"""
320
 
        self.set_vendor(None)
321
 
        # This is how I would normally test the connection code
322
 
        # it makes it very clear what we are testing.
323
 
        # However, 'ssh' will create stipple on the output, so instead
324
 
        # I'm using run_bzr_subprocess, and parsing the output
325
 
        # try:
326
 
        #     t = _mod_transport.get_transport(self.bogus_url)
327
 
        # except errors.ConnectionError:
328
 
        #     # Correct error
329
 
        #     pass
330
 
        # except errors.NameError, e:
331
 
        #     if 'SSHException' in str(e):
332
 
        #         raise TestSkipped('Known NameError bug in paramiko 1.6.1')
333
 
        #     raise
334
 
        # else:
335
 
        #     self.fail('Excepted ConnectionError to be raised')
336
 
 
337
 
        out, err = self.run_bzr_subprocess(['log', self.bogus_url], retcode=3)
338
 
        self.assertEqual('', out)
339
 
        if "NameError: global name 'SSHException'" in err:
340
 
            # We aren't fixing this bug, because it is a bug in
341
 
            # paramiko, but we know about it, so we don't have to
342
 
            # fail the test
343
 
            raise TestSkipped('Known NameError bug with paramiko-1.6.1')
344
 
        self.assertContainsRe(err, r'bzr: ERROR: Unable to connect to SSH host'
345
 
                                   r' 127\.0\.0\.1:\d+; ')
 
299
        f = file(os.devnull, "wb")
 
300
        self.addCleanup(f.close)
 
301
        self.set_vendor(None, f)
 
302
        t = _mod_transport.get_transport_from_url(self.bogus_url)
 
303
        try:
 
304
            self.assertRaises(errors.ConnectionError, t.get, 'foobar')
 
305
        except NameError, e:
 
306
            if "global name 'SSHException'" in str(e):
 
307
                self.knownFailure('Known NameError bug in paramiko 1.6.1')
 
308
            raise
346
309
 
347
310
 
348
311
class SFTPLatencyKnob(TestCaseWithSFTPServer):
399
362
class TestSocketDelay(TestCase):
400
363
 
401
364
    def setUp(self):
402
 
        TestCase.setUp(self)
 
365
        super(TestSocketDelay, self).setUp()
403
366
        self.requireFeature(features.paramiko)
404
367
 
405
368
    def test_delay(self):
442
405
 
443
406
 
444
407
class ReadvFile(object):
445
 
    """An object that acts like Paramiko's SFTPFile.readv()"""
 
408
    """An object that acts like Paramiko's SFTPFile when readv() is used"""
446
409
 
447
410
    def __init__(self, data):
448
411
        self._data = data
451
414
        for start, length in requests:
452
415
            yield self._data[start:start+length]
453
416
 
 
417
    def close(self):
 
418
        pass
 
419
 
454
420
 
455
421
def _null_report_activity(*a, **k):
456
422
    pass
510
476
            conf._get_config().update(
511
477
                {'sftptest': {'scheme': 'ssh', 'port': port, 'user': 'bar'}})
512
478
            conf._save()
513
 
        t = _mod_transport.get_transport('sftp://localhost:%d' % port)
 
479
        t = _mod_transport.get_transport_from_url(
 
480
            'sftp://localhost:%d' % port)
514
481
        # force a connection to be performed.
515
482
        t.has('foo')
516
483
        return t
529
496
        t = self.get_transport_for_connection(set_config=False)
530
497
        self.assertIs(None, t._get_credentials()[0])
531
498
        # No prompts should've been printed, stdin shouldn't have been read
532
 
        self.assertEquals("", stdout.getvalue())
533
 
        self.assertEquals(0, ui.ui_factory.stdin.tell())
 
499
        self.assertEqual("", stdout.getvalue())
 
500
        self.assertEqual(0, ui.ui_factory.stdin.tell())