~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

Rework test_script a little bit.


Don't allow someone to request a stdin request to echo.
Echo never reads from stdin, it just echos its arguments.
You use 'cat' if you want to read from stdin.

A few other fixes because the tests were using filenames
that are actually illegal on Windows, rather than just
nonexistant.


Change the exception handling for commands so that
unknown errors don't get silently squashed and then
turn into hard-to-debug errors later.

test_script now passes on Windows.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
 
18
18
from cStringIO import StringIO
 
19
import os
 
20
import subprocess
 
21
import sys
 
22
import threading
19
23
 
20
24
import bzrlib
21
25
from bzrlib import (
22
26
    errors,
23
27
    osutils,
 
28
    tests,
24
29
    urlutils,
25
30
    )
26
31
from bzrlib.errors import (DependencyNotPresent,
31
36
                           ReadError,
32
37
                           UnsupportedProtocol,
33
38
                           )
34
 
from bzrlib.tests import TestCase, TestCaseInTempDir
 
39
from bzrlib.tests import ParamikoFeature, TestCase, TestCaseInTempDir
35
40
from bzrlib.transport import (_clear_protocol_handlers,
36
41
                              _CoalescedOffset,
37
42
                              ConnectedTransport,
48
53
from bzrlib.transport.memory import MemoryTransport
49
54
from bzrlib.transport.local import (LocalTransport,
50
55
                                    EmulatedWin32LocalTransport)
 
56
from bzrlib.transport.pathfilter import PathFilteringServer
51
57
 
52
58
 
53
59
# TODO: Should possibly split transport-specific tests into their own files.
80
86
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
87
                                    'TestTransport.SampleHandler')
82
88
            self.assertEqual([SampleHandler.__module__,
83
 
                              'bzrlib.transport.chroot'],
 
89
                              'bzrlib.transport.chroot',
 
90
                              'bzrlib.transport.pathfilter'],
84
91
                             _get_transport_modules())
85
92
        finally:
86
93
            _set_protocol_handlers(handlers)
363
370
    def test_abspath(self):
364
371
        # The abspath is always relative to the chroot_url.
365
372
        server = ChrootServer(get_transport('memory:///foo/bar/'))
366
 
        server.setUp()
 
373
        self.start_server(server)
367
374
        transport = get_transport(server.get_url())
368
375
        self.assertEqual(server.get_url(), transport.abspath('/'))
369
376
 
370
377
        subdir_transport = transport.clone('subdir')
371
378
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
372
 
        server.tearDown()
373
379
 
374
380
    def test_clone(self):
375
381
        server = ChrootServer(get_transport('memory:///foo/bar/'))
376
 
        server.setUp()
 
382
        self.start_server(server)
377
383
        transport = get_transport(server.get_url())
378
384
        # relpath from root and root path are the same
379
385
        relpath_cloned = transport.clone('foo')
380
386
        abspath_cloned = transport.clone('/foo')
381
387
        self.assertEqual(server, relpath_cloned.server)
382
388
        self.assertEqual(server, abspath_cloned.server)
383
 
        server.tearDown()
384
389
 
385
390
    def test_chroot_url_preserves_chroot(self):
386
391
        """Calling get_transport on a chroot transport's base should produce a
393
398
            new_transport = get_transport(parent_url)
394
399
        """
395
400
        server = ChrootServer(get_transport('memory:///path/subpath'))
396
 
        server.setUp()
 
401
        self.start_server(server)
397
402
        transport = get_transport(server.get_url())
398
403
        new_transport = get_transport(transport.base)
399
404
        self.assertEqual(transport.server, new_transport.server)
400
405
        self.assertEqual(transport.base, new_transport.base)
401
 
        server.tearDown()
402
406
 
403
407
    def test_urljoin_preserves_chroot(self):
404
408
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
410
414
            new_transport = get_transport(parent_url)
411
415
        """
412
416
        server = ChrootServer(get_transport('memory:///path/'))
413
 
        server.setUp()
 
417
        self.start_server(server)
414
418
        transport = get_transport(server.get_url())
415
419
        self.assertRaises(
416
420
            InvalidURLJoin, urlutils.join, transport.base, '..')
417
 
        server.tearDown()
418
421
 
419
422
 
420
423
class ChrootServerTest(TestCase):
428
431
        backing_transport = MemoryTransport()
429
432
        server = ChrootServer(backing_transport)
430
433
        server.setUp()
431
 
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
434
        try:
 
435
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
436
        finally:
 
437
            server.tearDown()
432
438
 
433
439
    def test_tearDown(self):
434
440
        backing_transport = MemoryTransport()
441
447
        backing_transport = MemoryTransport()
442
448
        server = ChrootServer(backing_transport)
443
449
        server.setUp()
444
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
450
        try:
 
451
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
452
        finally:
 
453
            server.tearDown()
 
454
 
 
455
 
 
456
class PathFilteringDecoratorTransportTest(TestCase):
 
457
    """Pathfilter decoration specific tests."""
 
458
 
 
459
    def test_abspath(self):
 
460
        # The abspath is always relative to the base of the backing transport.
 
461
        server = PathFilteringServer(get_transport('memory:///foo/bar/'),
 
462
            lambda x: x)
 
463
        server.setUp()
 
464
        transport = get_transport(server.get_url())
 
465
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
466
 
 
467
        subdir_transport = transport.clone('subdir')
 
468
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
445
469
        server.tearDown()
446
470
 
 
471
    def make_pf_transport(self, filter_func=None):
 
472
        """Make a PathFilteringTransport backed by a MemoryTransport.
 
473
        
 
474
        :param filter_func: by default this will be a no-op function.  Use this
 
475
            parameter to override it."""
 
476
        if filter_func is None:
 
477
            filter_func = lambda x: x
 
478
        server = PathFilteringServer(
 
479
            get_transport('memory:///foo/bar/'), filter_func)
 
480
        server.setUp()
 
481
        self.addCleanup(server.tearDown)
 
482
        return get_transport(server.get_url())
 
483
 
 
484
    def test__filter(self):
 
485
        # _filter (with an identity func as filter_func) always returns
 
486
        # paths relative to the base of the backing transport.
 
487
        transport = self.make_pf_transport()
 
488
        self.assertEqual('foo', transport._filter('foo'))
 
489
        self.assertEqual('foo/bar', transport._filter('foo/bar'))
 
490
        self.assertEqual('', transport._filter('..'))
 
491
        self.assertEqual('', transport._filter('/'))
 
492
        # The base of the pathfiltering transport is taken into account too.
 
493
        transport = transport.clone('subdir1/subdir2')
 
494
        self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
 
495
        self.assertEqual(
 
496
            'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
 
497
        self.assertEqual('subdir1', transport._filter('..'))
 
498
        self.assertEqual('', transport._filter('/'))
 
499
 
 
500
    def test_filter_invocation(self):
 
501
        filter_log = []
 
502
        def filter(path):
 
503
            filter_log.append(path)
 
504
            return path
 
505
        transport = self.make_pf_transport(filter)
 
506
        transport.has('abc')
 
507
        self.assertEqual(['abc'], filter_log)
 
508
        del filter_log[:]
 
509
        transport.clone('abc').has('xyz')
 
510
        self.assertEqual(['abc/xyz'], filter_log)
 
511
        del filter_log[:]
 
512
        transport.has('/abc')
 
513
        self.assertEqual(['abc'], filter_log)
 
514
 
 
515
    def test_clone(self):
 
516
        transport = self.make_pf_transport()
 
517
        # relpath from root and root path are the same
 
518
        relpath_cloned = transport.clone('foo')
 
519
        abspath_cloned = transport.clone('/foo')
 
520
        self.assertEqual(transport.server, relpath_cloned.server)
 
521
        self.assertEqual(transport.server, abspath_cloned.server)
 
522
 
 
523
    def test_url_preserves_pathfiltering(self):
 
524
        """Calling get_transport on a pathfiltered transport's base should
 
525
        produce a transport with exactly the same behaviour as the original
 
526
        pathfiltered transport.
 
527
 
 
528
        This is so that it is not possible to escape (accidentally or
 
529
        otherwise) the filtering by doing::
 
530
            url = filtered_transport.base
 
531
            parent_url = urlutils.join(url, '..')
 
532
            new_transport = get_transport(parent_url)
 
533
        """
 
534
        transport = self.make_pf_transport()
 
535
        new_transport = get_transport(transport.base)
 
536
        self.assertEqual(transport.server, new_transport.server)
 
537
        self.assertEqual(transport.base, new_transport.base)
 
538
 
447
539
 
448
540
class ReadonlyDecoratorTransportTest(TestCase):
449
541
    """Readonly decoration specific tests."""
460
552
        import bzrlib.transport.readonly as readonly
461
553
        # connect to '.' via http which is not listable
462
554
        server = HttpServer()
463
 
        server.setUp()
464
 
        try:
465
 
            transport = get_transport('readonly+' + server.get_url())
466
 
            self.failUnless(isinstance(transport,
467
 
                                       readonly.ReadonlyTransportDecorator))
468
 
            self.assertEqual(False, transport.listable())
469
 
            self.assertEqual(True, transport.is_readonly())
470
 
        finally:
471
 
            server.tearDown()
 
555
        self.start_server(server)
 
556
        transport = get_transport('readonly+' + server.get_url())
 
557
        self.failUnless(isinstance(transport,
 
558
                                   readonly.ReadonlyTransportDecorator))
 
559
        self.assertEqual(False, transport.listable())
 
560
        self.assertEqual(True, transport.is_readonly())
472
561
 
473
562
 
474
563
class FakeNFSDecoratorTests(TestCaseInTempDir):
492
581
        from bzrlib.tests.http_server import HttpServer
493
582
        # connect to '.' via http which is not listable
494
583
        server = HttpServer()
495
 
        server.setUp()
496
 
        try:
497
 
            transport = self.get_nfs_transport(server.get_url())
498
 
            self.assertIsInstance(
499
 
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
500
 
            self.assertEqual(False, transport.listable())
501
 
            self.assertEqual(True, transport.is_readonly())
502
 
        finally:
503
 
            server.tearDown()
 
584
        self.start_server(server)
 
585
        transport = self.get_nfs_transport(server.get_url())
 
586
        self.assertIsInstance(
 
587
            transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
588
        self.assertEqual(False, transport.listable())
 
589
        self.assertEqual(True, transport.is_readonly())
504
590
 
505
591
    def test_fakenfs_server_default(self):
506
592
        # a FakeNFSServer() should bring up a local relpath server for itself
507
593
        import bzrlib.transport.fakenfs as fakenfs
508
594
        server = fakenfs.FakeNFSServer()
509
 
        server.setUp()
510
 
        try:
511
 
            # the url should be decorated appropriately
512
 
            self.assertStartsWith(server.get_url(), 'fakenfs+')
513
 
            # and we should be able to get a transport for it
514
 
            transport = get_transport(server.get_url())
515
 
            # which must be a FakeNFSTransportDecorator instance.
516
 
            self.assertIsInstance(
517
 
                transport, fakenfs.FakeNFSTransportDecorator)
518
 
        finally:
519
 
            server.tearDown()
 
595
        self.start_server(server)
 
596
        # the url should be decorated appropriately
 
597
        self.assertStartsWith(server.get_url(), 'fakenfs+')
 
598
        # and we should be able to get a transport for it
 
599
        transport = get_transport(server.get_url())
 
600
        # which must be a FakeNFSTransportDecorator instance.
 
601
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
520
602
 
521
603
    def test_fakenfs_rename_semantics(self):
522
604
        # a FakeNFS transport must mangle the way rename errors occur to
587
669
    def setUp(self):
588
670
        super(TestTransportImplementation, self).setUp()
589
671
        self._server = self.transport_server()
590
 
        self._server.setUp()
591
 
        self.addCleanup(self._server.tearDown)
 
672
        self.start_server(self._server)
592
673
 
593
674
    def get_transport(self, relpath=None):
594
675
        """Return a connected transport to the local directory.
800
881
        # readv records the supplied offset request
801
882
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
802
883
        self.assertEqual(expected_result, transport._activity)
 
884
 
 
885
 
 
886
class TestSSHConnections(tests.TestCaseWithTransport):
 
887
 
 
888
    def test_bzr_connect_to_bzr_ssh(self):
 
889
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
890
 
 
891
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
 
892
        """
 
893
        # This test actually causes a bzr instance to be invoked, which is very
 
894
        # expensive: it should be the only such test in the test suite.
 
895
        # A reasonable evolution for this would be to simply check inside
 
896
        # check_channel_exec_request that the command is appropriate, and then
 
897
        # satisfy requests in-process.
 
898
        self.requireFeature(ParamikoFeature)
 
899
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
 
900
        # override the interface (doesn't change self._vendor).
 
901
        # Note that this does encryption, so can be slow.
 
902
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
 
903
        from bzrlib.tests.stub_sftp import StubServer
 
904
 
 
905
        # Start an SSH server
 
906
        self.command_executed = []
 
907
        # XXX: This is horrible -- we define a really dumb SSH server that
 
908
        # executes commands, and manage the hooking up of stdin/out/err to the
 
909
        # SSH channel ourselves.  Surely this has already been implemented
 
910
        # elsewhere?
 
911
        class StubSSHServer(StubServer):
 
912
 
 
913
            test = self
 
914
 
 
915
            def check_channel_exec_request(self, channel, command):
 
916
                self.test.command_executed.append(command)
 
917
                proc = subprocess.Popen(
 
918
                    command, shell=True, stdin=subprocess.PIPE,
 
919
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
920
 
 
921
                # XXX: horribly inefficient, not to mention ugly.
 
922
                # Start a thread for each of stdin/out/err, and relay bytes from
 
923
                # the subprocess to channel and vice versa.
 
924
                def ferry_bytes(read, write, close):
 
925
                    while True:
 
926
                        bytes = read(1)
 
927
                        if bytes == '':
 
928
                            close()
 
929
                            break
 
930
                        write(bytes)
 
931
 
 
932
                file_functions = [
 
933
                    (channel.recv, proc.stdin.write, proc.stdin.close),
 
934
                    (proc.stdout.read, channel.sendall, channel.close),
 
935
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
 
936
                for read, write, close in file_functions:
 
937
                    t = threading.Thread(
 
938
                        target=ferry_bytes, args=(read, write, close))
 
939
                    t.start()
 
940
 
 
941
                return True
 
942
 
 
943
        ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
 
944
        # We *don't* want to override the default SSH vendor: the detected one
 
945
        # is the one to use.
 
946
        self.start_server(ssh_server)
 
947
        port = ssh_server._listener.port
 
948
 
 
949
        if sys.platform == 'win32':
 
950
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
951
        else:
 
952
            bzr_remote_path = self.get_bzr_path()
 
953
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
 
954
 
 
955
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
 
956
        # variable is used to tell bzr what command to run on the remote end.
 
957
        path_to_branch = osutils.abspath('.')
 
958
        if sys.platform == 'win32':
 
959
            # On Windows, we export all drives as '/C:/, etc. So we need to
 
960
            # prefix a '/' to get the right path.
 
961
            path_to_branch = '/' + path_to_branch
 
962
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
 
963
        t = get_transport(url)
 
964
        self.permit_url(t.base)
 
965
        t.mkdir('foo')
 
966
 
 
967
        self.assertEqual(
 
968
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
969
            self.command_executed)