~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp_transport.py

  • Committer: Robert Collins
  • Date: 2007-03-08 04:06:06 UTC
  • mfrom: (2323.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 2442.
  • Revision ID: robertc@robertcollins.net-20070308040606-84gsniv56huiyjt4
Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
2
 
 
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17
17
import os
18
18
import socket
19
19
import threading
 
20
import time
20
21
 
21
22
import bzrlib.bzrdir as bzrdir
22
23
import bzrlib.errors as errors
23
 
from bzrlib.osutils import pathjoin, lexists
 
24
from bzrlib.osutils import pathjoin, lexists, set_or_unset_env
24
25
from bzrlib.tests import TestCaseWithTransport, TestCase, TestSkipped
 
26
from bzrlib.tests.HttpServer import HttpServer
25
27
import bzrlib.transport
 
28
from bzrlib.transport import get_transport
 
29
import bzrlib.transport.http
26
30
from bzrlib.workingtree import WorkingTree
27
31
 
28
32
try:
32
36
    paramiko_loaded = False
33
37
 
34
38
 
 
39
def set_test_transport_to_sftp(testcase):
 
40
    """A helper to set transports on test case instances."""
 
41
    from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
 
42
    if getattr(testcase, '_get_remote_is_absolute', None) is None:
 
43
        testcase._get_remote_is_absolute = True
 
44
    if testcase._get_remote_is_absolute:
 
45
        testcase.transport_server = SFTPAbsoluteServer
 
46
    else:
 
47
        testcase.transport_server = SFTPHomeDirServer
 
48
    testcase.transport_readonly_server = HttpServer
 
49
 
 
50
 
35
51
class TestCaseWithSFTPServer(TestCaseWithTransport):
36
52
    """A test case base class that provides a sftp server on localhost."""
37
53
 
38
54
    def setUp(self):
39
 
        if not paramiko_loaded:
40
 
            raise TestSkipped('you must have paramiko to run this test')
41
55
        super(TestCaseWithSFTPServer, self).setUp()
42
 
        from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
43
 
        if getattr(self, '_get_remote_is_absolute', None) is None:
44
 
            self._get_remote_is_absolute = True
45
 
        if self._get_remote_is_absolute:
46
 
            self.transport_server = SFTPAbsoluteServer
47
 
        else:
48
 
            self.transport_server = SFTPHomeDirServer
49
 
        self.transport_readonly_server = bzrlib.transport.http.HttpServer
 
56
        if not paramiko_loaded:
 
57
            raise TestSkipped('you must have paramiko to run this test')
 
58
        set_test_transport_to_sftp(self) 
50
59
 
51
60
    def get_transport(self, path=None):
52
61
        """Return a transport relative to self._test_root."""
108
117
        ### FIXME decide and then test for all transports. RBC20051208
109
118
 
110
119
 
111
 
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
 
120
class SFTPTransportTestRelativeRoot(TestCaseWithSFTPServer):
112
121
    """Test the SFTP transport with homedir based relative paths."""
113
122
 
114
123
    def setUp(self):
115
124
        self._get_remote_is_absolute = False
116
 
        super(SFTPTransportTestRelative, self).setUp()
 
125
        super(SFTPTransportTestRelativeRoot, self).setUp()
117
126
 
118
127
    def test__remote_path_relative_root(self):
119
128
        # relative paths are preserved
120
129
        t = self.get_transport('')
 
130
        # the remote path should be ''
 
131
        self.assertEqual('', t._path)
121
132
        self.assertEqual('a', t._remote_path('a'))
122
133
 
123
134
 
185
196
                              clone_from=fake)
186
197
            self.fail('expected exception not raised')
187
198
        except TransportError, e:
188
 
            self.assertEquals(str(e), 
189
 
                    '~janneke: invalid port number')
 
199
            self.assertEquals(str(e),
 
200
                    'Transport error: '
 
201
                    'invalid port number ~janneke in url:\n'
 
202
                    'sftp://lilypond.org:~janneke/public_html/bzr/gub ')
 
203
 
 
204
    def test_get_paramiko_vendor(self):
 
205
        """Test that if no 'ssh' is available we get builtin paramiko"""
 
206
        from bzrlib.transport import ssh
 
207
        # set '.' as the only location in the path, forcing no 'ssh' to exist
 
208
        orig_vendor = ssh._ssh_vendor
 
209
        orig_path = set_or_unset_env('PATH', '.')
 
210
        try:
 
211
            # No vendor defined yet, query for one
 
212
            ssh._ssh_vendor = None
 
213
            vendor = ssh._get_ssh_vendor()
 
214
            self.assertIsInstance(vendor, ssh.ParamikoVendor)
 
215
        finally:
 
216
            set_or_unset_env('PATH', orig_path)
 
217
            ssh._ssh_vendor = orig_vendor
 
218
 
 
219
    def test_abspath_root_sibling_server(self):
 
220
        from bzrlib.transport.sftp import SFTPSiblingAbsoluteServer
 
221
        server = SFTPSiblingAbsoluteServer()
 
222
        server.setUp()
 
223
        try:
 
224
            transport = get_transport(server.get_url())
 
225
            self.assertFalse(transport.abspath('/').endswith('/~/'))
 
226
            self.assertTrue(transport.abspath('/').endswith('/'))
 
227
            del transport
 
228
        finally:
 
229
            server.tearDown()
190
230
 
191
231
 
192
232
class SFTPBranchTest(TestCaseWithSFTPServer):
225
265
        self.assertEquals(b2.revision_history(), ['a1', 'a2'])
226
266
 
227
267
 
228
 
class SFTPFullHandshakingTest(TestCaseWithSFTPServer):
229
 
    """Verify that a full-handshake (SSH over loopback TCP) sftp connection works."""
 
268
class SSHVendorConnection(TestCaseWithSFTPServer):
 
269
    """Test that the ssh vendors can all connect.
 
270
 
 
271
    Verify that a full-handshake (SSH over loopback TCP) sftp connection works.
 
272
 
 
273
    We have 3 sftp implementations in the test suite:
 
274
      'loopback': Doesn't use ssh, just uses a local socket. Most tests are
 
275
                  done this way to save the handshaking time, so it is not
 
276
                  tested again here
 
277
      'none':     This uses paramiko's built-in ssh client and server, and layers
 
278
                  sftp on top of it.
 
279
      None:       If 'ssh' exists on the machine, then it will be spawned as a
 
280
                  child process.
 
281
    """
230
282
    
231
 
    def test_connection(self):
 
283
    def setUp(self):
 
284
        super(SSHVendorConnection, self).setUp()
232
285
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
233
 
        self.transport_server = SFTPFullAbsoluteServer
234
 
        self.get_transport()
 
286
 
 
287
        def create_server():
 
288
            """Just a wrapper so that when created, it will set _vendor"""
 
289
            # SFTPFullAbsoluteServer can handle any vendor,
 
290
            # it just needs to be set between the time it is instantiated
 
291
            # and the time .setUp() is called
 
292
            server = SFTPFullAbsoluteServer()
 
293
            server._vendor = self._test_vendor
 
294
            return server
 
295
        self._test_vendor = 'loopback'
 
296
        self.transport_server = create_server
 
297
        f = open('a_file', 'wb')
 
298
        try:
 
299
            f.write('foobar\n')
 
300
        finally:
 
301
            f.close()
 
302
 
 
303
    def set_vendor(self, vendor):
 
304
        self._test_vendor = vendor
 
305
 
 
306
    def test_connection_paramiko(self):
 
307
        from bzrlib.transport import ssh
 
308
        self.set_vendor(ssh.ParamikoVendor())
 
309
        t = self.get_transport()
 
310
        self.assertEqual('foobar\n', t.get('a_file').read())
 
311
 
 
312
    def test_connection_vendor(self):
 
313
        raise TestSkipped("We don't test spawning real ssh,"
 
314
                          " because it prompts for a password."
 
315
                          " Enable this test if we figure out"
 
316
                          " how to prevent this.")
 
317
        self.set_vendor(None)
 
318
        t = self.get_transport()
 
319
        self.assertEqual('foobar\n', t.get('a_file').read())
 
320
 
 
321
 
 
322
class SSHVendorBadConnection(TestCaseWithTransport):
 
323
    """Test that the ssh vendors handle bad connection properly
 
324
 
 
325
    We don't subclass TestCaseWithSFTPServer, because we don't actually
 
326
    need an SFTP connection.
 
327
    """
 
328
 
 
329
    def setUp(self):
 
330
        if not paramiko_loaded:
 
331
            raise TestSkipped('you must have paramiko to run this test')
 
332
        super(SSHVendorBadConnection, self).setUp()
 
333
        import bzrlib.transport.ssh
 
334
 
 
335
        # open a random port, so we know nobody else is using it
 
336
        # but don't actually listen on the port.
 
337
        s = socket.socket()
 
338
        s.bind(('localhost', 0))
 
339
        self.bogus_url = 'sftp://%s:%s/' % s.getsockname()
 
340
 
 
341
        orig_vendor = bzrlib.transport.ssh._ssh_vendor
 
342
        def reset():
 
343
            bzrlib.transport.ssh._ssh_vendor = orig_vendor
 
344
            s.close()
 
345
        self.addCleanup(reset)
 
346
 
 
347
    def set_vendor(self, vendor):
 
348
        import bzrlib.transport.ssh
 
349
        bzrlib.transport.ssh._ssh_vendor = vendor
 
350
 
 
351
    def test_bad_connection_paramiko(self):
 
352
        """Test that a real connection attempt raises the right error"""
 
353
        from bzrlib.transport import ssh
 
354
        self.set_vendor(ssh.ParamikoVendor())
 
355
        self.assertRaises(errors.ConnectionError,
 
356
                          bzrlib.transport.get_transport, self.bogus_url)
 
357
 
 
358
    def test_bad_connection_ssh(self):
 
359
        """None => auto-detect vendor"""
 
360
        self.set_vendor(None)
 
361
        # This is how I would normally test the connection code
 
362
        # it makes it very clear what we are testing.
 
363
        # However, 'ssh' will create stipple on the output, so instead
 
364
        # I'm using run_bzr_subprocess, and parsing the output
 
365
        # try:
 
366
        #     t = bzrlib.transport.get_transport(self.bogus_url)
 
367
        # except errors.ConnectionError:
 
368
        #     # Correct error
 
369
        #     pass
 
370
        # except errors.NameError, e:
 
371
        #     if 'SSHException' in str(e):
 
372
        #         raise TestSkipped('Known NameError bug in paramiko 1.6.1')
 
373
        #     raise
 
374
        # else:
 
375
        #     self.fail('Excepted ConnectionError to be raised')
 
376
 
 
377
        out, err = self.run_bzr_subprocess('log', self.bogus_url, retcode=3)
 
378
        self.assertEqual('', out)
 
379
        if "NameError: global name 'SSHException'" in err:
 
380
            # We aren't fixing this bug, because it is a bug in
 
381
            # paramiko, but we know about it, so we don't have to
 
382
            # fail the test
 
383
            raise TestSkipped('Known NameError bug with paramiko-1.6.1')
 
384
        self.assertContainsRe(err, r'bzr: ERROR: Unable to connect to SSH host'
 
385
                                   r' 127\.0\.0\.1:\d+; ')
 
386
 
 
387
 
 
388
class SFTPLatencyKnob(TestCaseWithSFTPServer):
 
389
    """Test that the testing SFTPServer's latency knob works."""
 
390
 
 
391
    def test_latency_knob_slows_transport(self):
 
392
        # change the latency knob to 500ms. We take about 40ms for a 
 
393
        # loopback connection ordinarily.
 
394
        start_time = time.time()
 
395
        self.get_server().add_latency = 0.5
 
396
        transport = self.get_transport()
 
397
        with_latency_knob_time = time.time() - start_time
 
398
        self.assertTrue(with_latency_knob_time > 0.4)
 
399
 
 
400
    def test_default(self):
 
401
        # This test is potentially brittle: under extremely high machine load
 
402
        # it could fail, but that is quite unlikely
 
403
        start_time = time.time()
 
404
        transport = self.get_transport()
 
405
        regular_time = time.time() - start_time
 
406
        self.assertTrue(regular_time < 0.5)
 
407
 
 
408
 
 
409
class FakeSocket(object):
 
410
    """Fake socket object used to test the SocketDelay wrapper without
 
411
    using a real socket.
 
412
    """
 
413
 
 
414
    def __init__(self):
 
415
        self._data = ""
 
416
 
 
417
    def send(self, data, flags=0):
 
418
        self._data += data
 
419
        return len(data)
 
420
 
 
421
    def sendall(self, data, flags=0):
 
422
        self._data += data
 
423
        return len(data)
 
424
 
 
425
    def recv(self, size, flags=0):
 
426
        if size < len(self._data):
 
427
            result = self._data[:size]
 
428
            self._data = self._data[size:]
 
429
            return result
 
430
        else:
 
431
            result = self._data
 
432
            self._data = ""
 
433
            return result
 
434
 
 
435
 
 
436
class TestSocketDelay(TestCase):
 
437
 
 
438
    def setUp(self):
 
439
        TestCase.setUp(self)
 
440
        if not paramiko_loaded:
 
441
            raise TestSkipped('you must have paramiko to run this test')
 
442
 
 
443
    def test_delay(self):
 
444
        from bzrlib.transport.sftp import SocketDelay
 
445
        sending = FakeSocket()
 
446
        receiving = SocketDelay(sending, 0.1, bandwidth=1000000,
 
447
                                really_sleep=False)
 
448
        # check that simulated time is charged only per round-trip:
 
449
        t1 = SocketDelay.simulated_time
 
450
        receiving.send("connect1")
 
451
        self.assertEqual(sending.recv(1024), "connect1")
 
452
        t2 = SocketDelay.simulated_time
 
453
        self.assertAlmostEqual(t2 - t1, 0.1)
 
454
        receiving.send("connect2")
 
455
        self.assertEqual(sending.recv(1024), "connect2")
 
456
        sending.send("hello")
 
457
        self.assertEqual(receiving.recv(1024), "hello")
 
458
        t3 = SocketDelay.simulated_time
 
459
        self.assertAlmostEqual(t3 - t2, 0.1)
 
460
        sending.send("hello")
 
461
        self.assertEqual(receiving.recv(1024), "hello")
 
462
        sending.send("hello")
 
463
        self.assertEqual(receiving.recv(1024), "hello")
 
464
        sending.send("hello")
 
465
        self.assertEqual(receiving.recv(1024), "hello")
 
466
        t4 = SocketDelay.simulated_time
 
467
        self.assertAlmostEqual(t4, t3)
 
468
 
 
469
    def test_bandwidth(self):
 
470
        from bzrlib.transport.sftp import SocketDelay
 
471
        sending = FakeSocket()
 
472
        receiving = SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
 
473
                                really_sleep=False)
 
474
        # check that simulated time is charged only per round-trip:
 
475
        t1 = SocketDelay.simulated_time
 
476
        receiving.send("connect")
 
477
        self.assertEqual(sending.recv(1024), "connect")
 
478
        sending.send("a" * 100)
 
479
        self.assertEqual(receiving.recv(1024), "a" * 100)
 
480
        t2 = SocketDelay.simulated_time
 
481
        self.assertAlmostEqual(t2 - t1, 100 + 7)
 
482
 
 
483