~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Vincent Ladeuil
  • Date: 2011-08-12 09:49:24 UTC
  • mfrom: (6015.9.10 2.4)
  • mto: This revision was merged to the branch mainline in revision 6066.
  • Revision ID: v.ladeuil+lp@free.fr-20110812094924-knc5s0g7vs31a2f1
Merge 2.4 into trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
28
28
    transport,
29
29
    urlutils,
30
30
    )
 
31
from bzrlib.directory_service import directories
31
32
from bzrlib.transport import (
32
33
    chroot,
33
34
    fakenfs,
34
35
    http,
35
36
    local,
 
37
    location_to_url,
36
38
    memory,
37
39
    pathfilter,
38
40
    readonly,
39
41
    )
 
42
import bzrlib.transport.trace
40
43
from bzrlib.tests import (
41
44
    features,
42
45
    test_server,
49
52
class TestTransport(tests.TestCase):
50
53
    """Test the non transport-concrete class functionality."""
51
54
 
52
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
53
 
    # of try/finally -- vila 20100205
54
 
 
55
55
    def test__get_set_protocol_handlers(self):
56
56
        handlers = transport._get_protocol_handlers()
57
 
        self.assertNotEqual([], handlers.keys( ))
58
 
        try:
59
 
            transport._clear_protocol_handlers()
60
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
61
 
        finally:
62
 
            transport._set_protocol_handlers(handlers)
 
57
        self.assertNotEqual([], handlers.keys())
 
58
        transport._clear_protocol_handlers()
 
59
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
60
        self.assertEqual([], transport._get_protocol_handlers().keys())
63
61
 
64
62
    def test_get_transport_modules(self):
65
63
        handlers = transport._get_protocol_handlers()
 
64
        self.addCleanup(transport._set_protocol_handlers, handlers)
66
65
        # don't pollute the current handlers
67
66
        transport._clear_protocol_handlers()
 
67
 
68
68
        class SampleHandler(object):
69
69
            """I exist, isnt that enough?"""
70
 
        try:
71
 
            transport._clear_protocol_handlers()
72
 
            transport.register_transport_proto('foo')
73
 
            transport.register_lazy_transport('foo',
74
 
                                              'bzrlib.tests.test_transport',
75
 
                                              'TestTransport.SampleHandler')
76
 
            transport.register_transport_proto('bar')
77
 
            transport.register_lazy_transport('bar',
78
 
                                              'bzrlib.tests.test_transport',
79
 
                                              'TestTransport.SampleHandler')
80
 
            self.assertEqual([SampleHandler.__module__,
81
 
                              'bzrlib.transport.chroot',
82
 
                              'bzrlib.transport.pathfilter'],
83
 
                             transport._get_transport_modules())
84
 
        finally:
85
 
            transport._set_protocol_handlers(handlers)
 
70
        transport._clear_protocol_handlers()
 
71
        transport.register_transport_proto('foo')
 
72
        transport.register_lazy_transport('foo',
 
73
                                            'bzrlib.tests.test_transport',
 
74
                                            'TestTransport.SampleHandler')
 
75
        transport.register_transport_proto('bar')
 
76
        transport.register_lazy_transport('bar',
 
77
                                            'bzrlib.tests.test_transport',
 
78
                                            'TestTransport.SampleHandler')
 
79
        self.assertEqual([SampleHandler.__module__,
 
80
                            'bzrlib.transport.chroot',
 
81
                            'bzrlib.transport.pathfilter'],
 
82
                            transport._get_transport_modules())
86
83
 
87
84
    def test_transport_dependency(self):
88
85
        """Transport with missing dependency causes no error"""
89
86
        saved_handlers = transport._get_protocol_handlers()
 
87
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
90
88
        # don't pollute the current handlers
91
89
        transport._clear_protocol_handlers()
 
90
        transport.register_transport_proto('foo')
 
91
        transport.register_lazy_transport(
 
92
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
92
93
        try:
93
 
            transport.register_transport_proto('foo')
94
 
            transport.register_lazy_transport(
95
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
96
 
            try:
97
 
                transport.get_transport('foo://fooserver/foo')
98
 
            except errors.UnsupportedProtocol, e:
99
 
                e_str = str(e)
100
 
                self.assertEquals('Unsupported protocol'
101
 
                                  ' for url "foo://fooserver/foo":'
102
 
                                  ' Unable to import library "some_lib":'
103
 
                                  ' testing missing dependency', str(e))
104
 
            else:
105
 
                self.fail('Did not raise UnsupportedProtocol')
106
 
        finally:
107
 
            # restore original values
108
 
            transport._set_protocol_handlers(saved_handlers)
 
94
            transport.get_transport_from_url('foo://fooserver/foo')
 
95
        except errors.UnsupportedProtocol, e:
 
96
            e_str = str(e)
 
97
            self.assertEquals('Unsupported protocol'
 
98
                                ' for url "foo://fooserver/foo":'
 
99
                                ' Unable to import library "some_lib":'
 
100
                                ' testing missing dependency', str(e))
 
101
        else:
 
102
            self.fail('Did not raise UnsupportedProtocol')
109
103
 
110
104
    def test_transport_fallback(self):
111
105
        """Transport with missing dependency causes no error"""
112
106
        saved_handlers = transport._get_protocol_handlers()
113
 
        try:
114
 
            transport._clear_protocol_handlers()
115
 
            transport.register_transport_proto('foo')
116
 
            transport.register_lazy_transport(
117
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
118
 
            transport.register_lazy_transport(
119
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
120
 
            t = transport.get_transport('foo://fooserver/foo')
121
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
122
 
        finally:
123
 
            transport._set_protocol_handlers(saved_handlers)
 
107
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
108
        transport._clear_protocol_handlers()
 
109
        transport.register_transport_proto('foo')
 
110
        transport.register_lazy_transport(
 
111
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
112
        transport.register_lazy_transport(
 
113
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
114
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
115
        self.assertTrue(isinstance(t, BackupTransportHandler))
124
116
 
125
117
    def test_ssh_hints(self):
126
118
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
127
119
        try:
128
 
            transport.get_transport('ssh://fooserver/foo')
 
120
            transport.get_transport_from_url('ssh://fooserver/foo')
129
121
        except errors.UnsupportedProtocol, e:
130
122
            e_str = str(e)
131
123
            self.assertEquals('Unsupported protocol'
219
211
 
220
212
    def test_coalesce_fudge(self):
221
213
        self.check([(10, 30, [(0, 10), (20, 10)]),
222
 
                    (100, 10, [(0, 10),]),
 
214
                    (100, 10, [(0, 10)]),
223
215
                   ], [(10, 10), (30, 10), (100, 10)],
224
 
                   fudge=10
225
 
                  )
 
216
                   fudge=10)
 
217
 
226
218
    def test_coalesce_max_size(self):
227
219
        self.check([(10, 20, [(0, 10), (10, 10)]),
228
220
                    (30, 50, [(0, 50)]),
229
221
                    # If one range is above max_size, it gets its own coalesced
230
222
                    # offset
231
 
                    (100, 80, [(0, 80),]),],
 
223
                    (100, 80, [(0, 80)]),],
232
224
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
233
 
                   max_size=50
234
 
                  )
 
225
                   max_size=50)
235
226
 
236
227
    def test_coalesce_no_max_size(self):
237
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
228
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
238
229
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
239
230
                  )
240
231
 
241
232
    def test_coalesce_default_limit(self):
242
233
        # By default we use a 100MB max size.
243
 
        ten_mb = 10*1024*1024
244
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
234
        ten_mb = 10 * 1024 * 1024
 
235
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
245
236
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
246
237
                   [(i*ten_mb, ten_mb) for i in range(11)])
247
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
248
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
238
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
239
                   [(i * ten_mb, ten_mb) for i in range(11)],
249
240
                   max_size=1*1024*1024*1024)
250
241
 
251
242
 
256
247
        server.start_server()
257
248
        url = server.get_url()
258
249
        self.assertTrue(url in transport.transport_list_registry)
259
 
        t = transport.get_transport(url)
 
250
        t = transport.get_transport_from_url(url)
260
251
        del t
261
252
        server.stop_server()
262
253
        self.assertFalse(url in transport.transport_list_registry)
377
368
    def test_abspath(self):
378
369
        # The abspath is always relative to the chroot_url.
379
370
        server = chroot.ChrootServer(
380
 
            transport.get_transport('memory:///foo/bar/'))
 
371
            transport.get_transport_from_url('memory:///foo/bar/'))
381
372
        self.start_server(server)
382
 
        t = transport.get_transport(server.get_url())
 
373
        t = transport.get_transport_from_url(server.get_url())
383
374
        self.assertEqual(server.get_url(), t.abspath('/'))
384
375
 
385
376
        subdir_t = t.clone('subdir')
387
378
 
388
379
    def test_clone(self):
389
380
        server = chroot.ChrootServer(
390
 
            transport.get_transport('memory:///foo/bar/'))
 
381
            transport.get_transport_from_url('memory:///foo/bar/'))
391
382
        self.start_server(server)
392
 
        t = transport.get_transport(server.get_url())
 
383
        t = transport.get_transport_from_url(server.get_url())
393
384
        # relpath from root and root path are the same
394
385
        relpath_cloned = t.clone('foo')
395
386
        abspath_cloned = t.clone('/foo')
404
395
        This is so that it is not possible to escape a chroot by doing::
405
396
            url = chroot_transport.base
406
397
            parent_url = urlutils.join(url, '..')
407
 
            new_t = transport.get_transport(parent_url)
 
398
            new_t = transport.get_transport_from_url(parent_url)
408
399
        """
409
400
        server = chroot.ChrootServer(
410
 
            transport.get_transport('memory:///path/subpath'))
 
401
            transport.get_transport_from_url('memory:///path/subpath'))
411
402
        self.start_server(server)
412
 
        t = transport.get_transport(server.get_url())
413
 
        new_t = transport.get_transport(t.base)
 
403
        t = transport.get_transport_from_url(server.get_url())
 
404
        new_t = transport.get_transport_from_url(t.base)
414
405
        self.assertEqual(t.server, new_t.server)
415
406
        self.assertEqual(t.base, new_t.base)
416
407
 
421
412
        This is so that it is not possible to escape a chroot by doing::
422
413
            url = chroot_transport.base
423
414
            parent_url = urlutils.join(url, '..')
424
 
            new_t = transport.get_transport(parent_url)
 
415
            new_t = transport.get_transport_from_url(parent_url)
425
416
        """
426
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
417
        server = chroot.ChrootServer(
 
418
            transport.get_transport_from_url('memory:///path/'))
427
419
        self.start_server(server)
428
 
        t = transport.get_transport(server.get_url())
 
420
        t = transport.get_transport_from_url(server.get_url())
429
421
        self.assertRaises(
430
422
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
431
423
 
441
433
        backing_transport = memory.MemoryTransport()
442
434
        server = chroot.ChrootServer(backing_transport)
443
435
        server.start_server()
444
 
        try:
445
 
            self.assertTrue(server.scheme
446
 
                            in transport._get_protocol_handlers().keys())
447
 
        finally:
448
 
            server.stop_server()
 
436
        self.addCleanup(server.stop_server)
 
437
        self.assertTrue(server.scheme
 
438
                        in transport._get_protocol_handlers().keys())
449
439
 
450
440
    def test_stop_server(self):
451
441
        backing_transport = memory.MemoryTransport()
459
449
        backing_transport = memory.MemoryTransport()
460
450
        server = chroot.ChrootServer(backing_transport)
461
451
        server.start_server()
462
 
        try:
463
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
464
 
        finally:
465
 
            server.stop_server()
 
452
        self.addCleanup(server.stop_server)
 
453
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
466
454
 
467
455
 
468
456
class PathFilteringDecoratorTransportTest(tests.TestCase):
471
459
    def test_abspath(self):
472
460
        # The abspath is always relative to the base of the backing transport.
473
461
        server = pathfilter.PathFilteringServer(
474
 
            transport.get_transport('memory:///foo/bar/'),
 
462
            transport.get_transport_from_url('memory:///foo/bar/'),
475
463
            lambda x: x)
476
464
        server.start_server()
477
 
        t = transport.get_transport(server.get_url())
 
465
        t = transport.get_transport_from_url(server.get_url())
478
466
        self.assertEqual(server.get_url(), t.abspath('/'))
479
467
 
480
468
        subdir_t = t.clone('subdir')
483
471
 
484
472
    def make_pf_transport(self, filter_func=None):
485
473
        """Make a PathFilteringTransport backed by a MemoryTransport.
486
 
        
 
474
 
487
475
        :param filter_func: by default this will be a no-op function.  Use this
488
476
            parameter to override it."""
489
477
        if filter_func is None:
490
478
            filter_func = lambda x: x
491
479
        server = pathfilter.PathFilteringServer(
492
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
480
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
493
481
        server.start_server()
494
482
        self.addCleanup(server.stop_server)
495
 
        return transport.get_transport(server.get_url())
 
483
        return transport.get_transport_from_url(server.get_url())
496
484
 
497
485
    def test__filter(self):
498
486
        # _filter (with an identity func as filter_func) always returns
511
499
 
512
500
    def test_filter_invocation(self):
513
501
        filter_log = []
 
502
 
514
503
        def filter(path):
515
504
            filter_log.append(path)
516
505
            return path
541
530
        otherwise) the filtering by doing::
542
531
            url = filtered_transport.base
543
532
            parent_url = urlutils.join(url, '..')
544
 
            new_t = transport.get_transport(parent_url)
 
533
            new_t = transport.get_transport_from_url(parent_url)
545
534
        """
546
535
        t = self.make_pf_transport()
547
 
        new_t = transport.get_transport(t.base)
 
536
        new_t = transport.get_transport_from_url(t.base)
548
537
        self.assertEqual(t.server, new_t.server)
549
538
        self.assertEqual(t.base, new_t.base)
550
539
 
563
552
        # connect to '.' via http which is not listable
564
553
        server = HttpServer()
565
554
        self.start_server(server)
566
 
        t = transport.get_transport('readonly+' + server.get_url())
 
555
        t = transport.get_transport_from_url('readonly+' + server.get_url())
567
556
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
568
557
        self.assertEqual(False, t.listable())
569
558
        self.assertEqual(True, t.is_readonly())
602
591
        # the url should be decorated appropriately
603
592
        self.assertStartsWith(server.get_url(), 'fakenfs+')
604
593
        # and we should be able to get a transport for it
605
 
        t = transport.get_transport(server.get_url())
 
594
        t = transport.get_transport_from_url(server.get_url())
606
595
        # which must be a FakeNFSTransportDecorator instance.
607
596
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
608
597
 
685
674
        base_url = self._server.get_url()
686
675
        url = self._adjust_url(base_url, relpath)
687
676
        # try getting the transport via the regular interface:
688
 
        t = transport.get_transport(url)
 
677
        t = transport.get_transport_from_url(url)
689
678
        # vila--20070607 if the following are commented out the test suite
690
679
        # still pass. Is this really still needed or was it a forgotten
691
680
        # temporary fix ?
696
685
        return t
697
686
 
698
687
 
 
688
class TestTransportFromPath(tests.TestCaseInTempDir):
 
689
 
 
690
    def test_with_path(self):
 
691
        t = transport.get_transport_from_path(self.test_dir)
 
692
        self.assertIsInstance(t, local.LocalTransport)
 
693
        self.assertEquals(t.base.rstrip("/"),
 
694
            urlutils.local_path_to_url(self.test_dir))
 
695
 
 
696
    def test_with_url(self):
 
697
        t = transport.get_transport_from_path("file:")
 
698
        self.assertIsInstance(t, local.LocalTransport)
 
699
        self.assertEquals(t.base.rstrip("/"),
 
700
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
701
 
 
702
 
 
703
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
704
 
 
705
    def test_with_path(self):
 
706
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
707
            self.test_dir)
 
708
 
 
709
    def test_with_url(self):
 
710
        url = urlutils.local_path_to_url(self.test_dir)
 
711
        t = transport.get_transport_from_url(url)
 
712
        self.assertIsInstance(t, local.LocalTransport)
 
713
        self.assertEquals(t.base.rstrip("/"), url)
 
714
 
 
715
 
699
716
class TestLocalTransports(tests.TestCase):
700
717
 
701
718
    def test_get_transport_from_abspath(self):
731
748
        We can't easily observe the external effect but we can at least see
732
749
        it's called.
733
750
        """
 
751
        sentinel = object()
 
752
        fdatasync = getattr(os, 'fdatasync', sentinel)
 
753
        if fdatasync is sentinel:
 
754
            raise tests.TestNotApplicable('fdatasync not supported')
734
755
        t = self.get_transport('.')
735
756
        calls = self.recordCalls(os, 'fdatasync')
736
757
        w = t.open_write_stream('out')
796
817
    def test_relpath(self):
797
818
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
798
819
 
799
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
820
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
 
821
            'sub')
800
822
        self.assertRaises(errors.PathNotChild, t.relpath,
801
823
                          'http://user@host.com/abs/path/sub')
802
824
        self.assertRaises(errors.PathNotChild, t.relpath,
846
868
 
847
869
    def test_reuse_same_transport(self):
848
870
        possible_transports = []
849
 
        t1 = transport.get_transport('http://foo/',
 
871
        t1 = transport.get_transport_from_url('http://foo/',
850
872
                                     possible_transports=possible_transports)
851
873
        self.assertEqual([t1], possible_transports)
852
 
        t2 = transport.get_transport('http://foo/',
 
874
        t2 = transport.get_transport_from_url('http://foo/',
853
875
                                     possible_transports=[t1])
854
876
        self.assertIs(t1, t2)
855
877
 
856
878
        # Also check that final '/' are handled correctly
857
 
        t3 = transport.get_transport('http://foo/path/')
858
 
        t4 = transport.get_transport('http://foo/path',
 
879
        t3 = transport.get_transport_from_url('http://foo/path/')
 
880
        t4 = transport.get_transport_from_url('http://foo/path',
859
881
                                     possible_transports=[t3])
860
882
        self.assertIs(t3, t4)
861
883
 
862
 
        t5 = transport.get_transport('http://foo/path')
863
 
        t6 = transport.get_transport('http://foo/path/',
 
884
        t5 = transport.get_transport_from_url('http://foo/path')
 
885
        t6 = transport.get_transport_from_url('http://foo/path/',
864
886
                                     possible_transports=[t5])
865
887
        self.assertIs(t5, t6)
866
888
 
867
889
    def test_don_t_reuse_different_transport(self):
868
 
        t1 = transport.get_transport('http://foo/path')
869
 
        t2 = transport.get_transport('http://bar/path',
 
890
        t1 = transport.get_transport_from_url('http://foo/path')
 
891
        t2 = transport.get_transport_from_url('http://bar/path',
870
892
                                     possible_transports=[t1])
871
893
        self.assertIsNot(t1, t2)
872
894
 
873
895
 
874
896
class TestTransportTrace(tests.TestCase):
875
897
 
876
 
    def test_get(self):
877
 
        t = transport.get_transport('trace+memory://')
878
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
898
    def test_decorator(self):
 
899
        t = transport.get_transport_from_url('trace+memory://')
 
900
        self.assertIsInstance(
 
901
            t, bzrlib.transport.trace.TransportTraceDecorator)
879
902
 
880
903
    def test_clone_preserves_activity(self):
881
 
        t = transport.get_transport('trace+memory://')
 
904
        t = transport.get_transport_from_url('trace+memory://')
882
905
        t2 = t.clone('.')
883
906
        self.assertTrue(t is not t2)
884
907
        self.assertTrue(t._activity is t2._activity)
888
911
    # still won't cause a test failure when the top level Transport API
889
912
    # changes; so there is little return doing that.
890
913
    def test_get(self):
891
 
        t = transport.get_transport('trace+memory:///')
 
914
        t = transport.get_transport_from_url('trace+memory:///')
892
915
        t.put_bytes('foo', 'barish')
893
916
        t.get('foo')
894
917
        expected_result = []
900
923
        self.assertEqual(expected_result, t._activity)
901
924
 
902
925
    def test_readv(self):
903
 
        t = transport.get_transport('trace+memory:///')
 
926
        t = transport.get_transport_from_url('trace+memory:///')
904
927
        t.put_bytes('foo', 'barish')
905
928
        list(t.readv('foo', [(0, 1), (3, 2)],
906
929
                     adjust_for_latency=True, upper_limit=6))
916
939
class TestSSHConnections(tests.TestCaseWithTransport):
917
940
 
918
941
    def test_bzr_connect_to_bzr_ssh(self):
919
 
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
942
        """get_transport of a bzr+ssh:// behaves correctly.
920
943
 
921
944
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
922
945
        """
938
961
        # SSH channel ourselves.  Surely this has already been implemented
939
962
        # elsewhere?
940
963
        started = []
 
964
 
941
965
        class StubSSHServer(stub_sftp.StubServer):
942
966
 
943
967
            test = self
949
973
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
950
974
 
951
975
                # XXX: horribly inefficient, not to mention ugly.
952
 
                # Start a thread for each of stdin/out/err, and relay bytes from
953
 
                # the subprocess to channel and vice versa.
 
976
                # Start a thread for each of stdin/out/err, and relay bytes
 
977
                # from the subprocess to channel and vice versa.
954
978
                def ferry_bytes(read, write, close):
955
979
                    while True:
956
980
                        bytes = read(1)
1024
1048
        result = http.unhtml_roughly(fake_html)
1025
1049
        self.assertEquals(len(result), 1000)
1026
1050
        self.assertStartsWith(result, " something!")
 
1051
 
 
1052
 
 
1053
class SomeDirectory(object):
 
1054
 
 
1055
    def look_up(self, name, url):
 
1056
        return "http://bar"
 
1057
 
 
1058
 
 
1059
class TestLocationToUrl(tests.TestCase):
 
1060
 
 
1061
    def test_regular_url(self):
 
1062
        self.assertEquals("file://foo", location_to_url("file://foo"))
 
1063
 
 
1064
    def test_directory(self):
 
1065
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1066
        self.addCleanup(directories.remove, "bar:")
 
1067
        self.assertEquals("http://bar", location_to_url("bar:"))
 
1068
 
 
1069
    def test_unicode_url(self):
 
1070
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1071
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1072
 
 
1073
    def test_unicode_path(self):
 
1074
        self.assertEquals("file:///foo/bar%C3%AF",
 
1075
            location_to_url("/foo/bar\xc3\xaf".decode("utf-8")))
 
1076
 
 
1077
    def test_path(self):
 
1078
        self.assertEquals("file:///foo/bar", location_to_url("/foo/bar"))
 
1079
 
 
1080
    def test_relative_file_url(self):
 
1081
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
 
1082
            location_to_url("file:bar"))
 
1083
 
 
1084
    def test_absolute_file_url(self):
 
1085
        self.assertEquals("file:///bar", location_to_url("file:/bar"))