~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2011-07-25 14:01:28 UTC
  • mfrom: (6039.1.9 transport-from-url)
  • Revision ID: pqm@pqm.ubuntu.com-20110725140128-croovh96z0rs57yy
(jelmer) Add get_transport_from_url and get_transport_from_path functions.
 (Jelmer Vernooij)

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
 
18
18
from cStringIO import StringIO
 
19
import os
19
20
import subprocess
20
21
import sys
21
22
import threading
38
39
    pathfilter,
39
40
    readonly,
40
41
    )
 
42
import bzrlib.transport.trace
41
43
from bzrlib.tests import (
42
44
    features,
43
45
    test_server,
89
91
        transport.register_lazy_transport(
90
92
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
91
93
        try:
92
 
            transport.get_transport('foo://fooserver/foo')
 
94
            transport.get_transport_from_url('foo://fooserver/foo')
93
95
        except errors.UnsupportedProtocol, e:
94
96
            e_str = str(e)
95
97
            self.assertEquals('Unsupported protocol'
109
111
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
110
112
        transport.register_lazy_transport(
111
113
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
112
 
        t = transport.get_transport('foo://fooserver/foo')
 
114
        t = transport.get_transport_from_url('foo://fooserver/foo')
113
115
        self.assertTrue(isinstance(t, BackupTransportHandler))
114
116
 
115
117
    def test_ssh_hints(self):
116
118
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
117
119
        try:
118
 
            transport.get_transport('ssh://fooserver/foo')
 
120
            transport.get_transport_from_url('ssh://fooserver/foo')
119
121
        except errors.UnsupportedProtocol, e:
120
122
            e_str = str(e)
121
123
            self.assertEquals('Unsupported protocol'
245
247
        server.start_server()
246
248
        url = server.get_url()
247
249
        self.assertTrue(url in transport.transport_list_registry)
248
 
        t = transport.get_transport(url)
 
250
        t = transport.get_transport_from_url(url)
249
251
        del t
250
252
        server.stop_server()
251
253
        self.assertFalse(url in transport.transport_list_registry)
366
368
    def test_abspath(self):
367
369
        # The abspath is always relative to the chroot_url.
368
370
        server = chroot.ChrootServer(
369
 
            transport.get_transport('memory:///foo/bar/'))
 
371
            transport.get_transport_from_url('memory:///foo/bar/'))
370
372
        self.start_server(server)
371
 
        t = transport.get_transport(server.get_url())
 
373
        t = transport.get_transport_from_url(server.get_url())
372
374
        self.assertEqual(server.get_url(), t.abspath('/'))
373
375
 
374
376
        subdir_t = t.clone('subdir')
376
378
 
377
379
    def test_clone(self):
378
380
        server = chroot.ChrootServer(
379
 
            transport.get_transport('memory:///foo/bar/'))
 
381
            transport.get_transport_from_url('memory:///foo/bar/'))
380
382
        self.start_server(server)
381
 
        t = transport.get_transport(server.get_url())
 
383
        t = transport.get_transport_from_url(server.get_url())
382
384
        # relpath from root and root path are the same
383
385
        relpath_cloned = t.clone('foo')
384
386
        abspath_cloned = t.clone('/foo')
393
395
        This is so that it is not possible to escape a chroot by doing::
394
396
            url = chroot_transport.base
395
397
            parent_url = urlutils.join(url, '..')
396
 
            new_t = transport.get_transport(parent_url)
 
398
            new_t = transport.get_transport_from_url(parent_url)
397
399
        """
398
400
        server = chroot.ChrootServer(
399
 
            transport.get_transport('memory:///path/subpath'))
 
401
            transport.get_transport_from_url('memory:///path/subpath'))
400
402
        self.start_server(server)
401
 
        t = transport.get_transport(server.get_url())
402
 
        new_t = transport.get_transport(t.base)
 
403
        t = transport.get_transport_from_url(server.get_url())
 
404
        new_t = transport.get_transport_from_url(t.base)
403
405
        self.assertEqual(t.server, new_t.server)
404
406
        self.assertEqual(t.base, new_t.base)
405
407
 
410
412
        This is so that it is not possible to escape a chroot by doing::
411
413
            url = chroot_transport.base
412
414
            parent_url = urlutils.join(url, '..')
413
 
            new_t = transport.get_transport(parent_url)
 
415
            new_t = transport.get_transport_from_url(parent_url)
414
416
        """
415
417
        server = chroot.ChrootServer(
416
 
            transport.get_transport('memory:///path/'))
 
418
            transport.get_transport_from_url('memory:///path/'))
417
419
        self.start_server(server)
418
 
        t = transport.get_transport(server.get_url())
 
420
        t = transport.get_transport_from_url(server.get_url())
419
421
        self.assertRaises(
420
422
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
421
423
 
457
459
    def test_abspath(self):
458
460
        # The abspath is always relative to the base of the backing transport.
459
461
        server = pathfilter.PathFilteringServer(
460
 
            transport.get_transport('memory:///foo/bar/'),
 
462
            transport.get_transport_from_url('memory:///foo/bar/'),
461
463
            lambda x: x)
462
464
        server.start_server()
463
 
        t = transport.get_transport(server.get_url())
 
465
        t = transport.get_transport_from_url(server.get_url())
464
466
        self.assertEqual(server.get_url(), t.abspath('/'))
465
467
 
466
468
        subdir_t = t.clone('subdir')
475
477
        if filter_func is None:
476
478
            filter_func = lambda x: x
477
479
        server = pathfilter.PathFilteringServer(
478
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
480
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
479
481
        server.start_server()
480
482
        self.addCleanup(server.stop_server)
481
 
        return transport.get_transport(server.get_url())
 
483
        return transport.get_transport_from_url(server.get_url())
482
484
 
483
485
    def test__filter(self):
484
486
        # _filter (with an identity func as filter_func) always returns
528
530
        otherwise) the filtering by doing::
529
531
            url = filtered_transport.base
530
532
            parent_url = urlutils.join(url, '..')
531
 
            new_t = transport.get_transport(parent_url)
 
533
            new_t = transport.get_transport_from_url(parent_url)
532
534
        """
533
535
        t = self.make_pf_transport()
534
 
        new_t = transport.get_transport(t.base)
 
536
        new_t = transport.get_transport_from_url(t.base)
535
537
        self.assertEqual(t.server, new_t.server)
536
538
        self.assertEqual(t.base, new_t.base)
537
539
 
550
552
        # connect to '.' via http which is not listable
551
553
        server = HttpServer()
552
554
        self.start_server(server)
553
 
        t = transport.get_transport('readonly+' + server.get_url())
 
555
        t = transport.get_transport_from_url('readonly+' + server.get_url())
554
556
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
555
557
        self.assertEqual(False, t.listable())
556
558
        self.assertEqual(True, t.is_readonly())
589
591
        # the url should be decorated appropriately
590
592
        self.assertStartsWith(server.get_url(), 'fakenfs+')
591
593
        # and we should be able to get a transport for it
592
 
        t = transport.get_transport(server.get_url())
 
594
        t = transport.get_transport_from_url(server.get_url())
593
595
        # which must be a FakeNFSTransportDecorator instance.
594
596
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
595
597
 
672
674
        base_url = self._server.get_url()
673
675
        url = self._adjust_url(base_url, relpath)
674
676
        # try getting the transport via the regular interface:
675
 
        t = transport.get_transport(url)
 
677
        t = transport.get_transport_from_url(url)
676
678
        # vila--20070607 if the following are commented out the test suite
677
679
        # still pass. Is this really still needed or was it a forgotten
678
680
        # temporary fix ?
683
685
        return t
684
686
 
685
687
 
 
688
class TestTransportFromPath(tests.TestCaseInTempDir):
 
689
 
 
690
    def test_with_path(self):
 
691
        t = transport.get_transport_from_path(self.test_dir)
 
692
        self.assertIsInstance(t, local.LocalTransport)
 
693
        self.assertEquals(t.base.rstrip("/"),
 
694
            urlutils.local_path_to_url(self.test_dir))
 
695
 
 
696
    def test_with_url(self):
 
697
        t = transport.get_transport_from_path("file:")
 
698
        self.assertIsInstance(t, local.LocalTransport)
 
699
        self.assertEquals(t.base.rstrip("/"),
 
700
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
701
 
 
702
 
 
703
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
704
 
 
705
    def test_with_path(self):
 
706
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
707
            self.test_dir)
 
708
 
 
709
    def test_with_url(self):
 
710
        url = urlutils.local_path_to_url(self.test_dir)
 
711
        t = transport.get_transport_from_url(url)
 
712
        self.assertIsInstance(t, local.LocalTransport)
 
713
        self.assertEquals(t.base.rstrip("/"), url)
 
714
 
 
715
 
686
716
class TestLocalTransports(tests.TestCase):
687
717
 
688
718
    def test_get_transport_from_abspath(self):
815
845
 
816
846
    def test_reuse_same_transport(self):
817
847
        possible_transports = []
818
 
        t1 = transport.get_transport('http://foo/',
 
848
        t1 = transport.get_transport_from_url('http://foo/',
819
849
                                     possible_transports=possible_transports)
820
850
        self.assertEqual([t1], possible_transports)
821
 
        t2 = transport.get_transport('http://foo/',
 
851
        t2 = transport.get_transport_from_url('http://foo/',
822
852
                                     possible_transports=[t1])
823
853
        self.assertIs(t1, t2)
824
854
 
825
855
        # Also check that final '/' are handled correctly
826
 
        t3 = transport.get_transport('http://foo/path/')
827
 
        t4 = transport.get_transport('http://foo/path',
 
856
        t3 = transport.get_transport_from_url('http://foo/path/')
 
857
        t4 = transport.get_transport_from_url('http://foo/path',
828
858
                                     possible_transports=[t3])
829
859
        self.assertIs(t3, t4)
830
860
 
831
 
        t5 = transport.get_transport('http://foo/path')
832
 
        t6 = transport.get_transport('http://foo/path/',
 
861
        t5 = transport.get_transport_from_url('http://foo/path')
 
862
        t6 = transport.get_transport_from_url('http://foo/path/',
833
863
                                     possible_transports=[t5])
834
864
        self.assertIs(t5, t6)
835
865
 
836
866
    def test_don_t_reuse_different_transport(self):
837
 
        t1 = transport.get_transport('http://foo/path')
838
 
        t2 = transport.get_transport('http://bar/path',
 
867
        t1 = transport.get_transport_from_url('http://foo/path')
 
868
        t2 = transport.get_transport_from_url('http://bar/path',
839
869
                                     possible_transports=[t1])
840
870
        self.assertIsNot(t1, t2)
841
871
 
842
872
 
843
873
class TestTransportTrace(tests.TestCase):
844
874
 
845
 
    def test_get(self):
846
 
        t = transport.get_transport('trace+memory://')
 
875
    def test_decorator(self):
 
876
        t = transport.get_transport_from_url('trace+memory://')
847
877
        self.assertIsInstance(
848
878
            t, bzrlib.transport.trace.TransportTraceDecorator)
849
879
 
850
880
    def test_clone_preserves_activity(self):
851
 
        t = transport.get_transport('trace+memory://')
 
881
        t = transport.get_transport_from_url('trace+memory://')
852
882
        t2 = t.clone('.')
853
883
        self.assertTrue(t is not t2)
854
884
        self.assertTrue(t._activity is t2._activity)
858
888
    # still won't cause a test failure when the top level Transport API
859
889
    # changes; so there is little return doing that.
860
890
    def test_get(self):
861
 
        t = transport.get_transport('trace+memory:///')
 
891
        t = transport.get_transport_from_url('trace+memory:///')
862
892
        t.put_bytes('foo', 'barish')
863
893
        t.get('foo')
864
894
        expected_result = []
870
900
        self.assertEqual(expected_result, t._activity)
871
901
 
872
902
    def test_readv(self):
873
 
        t = transport.get_transport('trace+memory:///')
 
903
        t = transport.get_transport_from_url('trace+memory:///')
874
904
        t.put_bytes('foo', 'barish')
875
905
        list(t.readv('foo', [(0, 1), (3, 2)],
876
906
                     adjust_for_latency=True, upper_limit=6))