~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Patch Queue Manager
  • Date: 2011-10-09 13:52:06 UTC
  • mfrom: (6202.1.3 revno-revision)
  • Revision ID: pqm@pqm.ubuntu.com-20111009135206-t3utsln6mtzv9eut
(jelmer) Add a --revision argument to 'bzr revno'. (Jelmer Vernooij)

Show diffs side-by-side

added added

removed removed

Lines of Context:
28
28
    transport,
29
29
    urlutils,
30
30
    )
 
31
from bzrlib.directory_service import directories
31
32
from bzrlib.transport import (
32
33
    chroot,
33
34
    fakenfs,
34
35
    http,
35
36
    local,
 
37
    location_to_url,
36
38
    memory,
37
39
    pathfilter,
38
40
    readonly,
39
41
    )
 
42
import bzrlib.transport.trace
40
43
from bzrlib.tests import (
41
44
    features,
42
45
    test_server,
49
52
class TestTransport(tests.TestCase):
50
53
    """Test the non transport-concrete class functionality."""
51
54
 
52
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
53
 
    # of try/finally -- vila 20100205
54
 
 
55
55
    def test__get_set_protocol_handlers(self):
56
56
        handlers = transport._get_protocol_handlers()
57
 
        self.assertNotEqual([], handlers.keys( ))
58
 
        try:
59
 
            transport._clear_protocol_handlers()
60
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
61
 
        finally:
62
 
            transport._set_protocol_handlers(handlers)
 
57
        self.assertNotEqual([], handlers.keys())
 
58
        transport._clear_protocol_handlers()
 
59
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
60
        self.assertEqual([], transport._get_protocol_handlers().keys())
63
61
 
64
62
    def test_get_transport_modules(self):
65
63
        handlers = transport._get_protocol_handlers()
 
64
        self.addCleanup(transport._set_protocol_handlers, handlers)
66
65
        # don't pollute the current handlers
67
66
        transport._clear_protocol_handlers()
 
67
 
68
68
        class SampleHandler(object):
69
69
            """I exist, isnt that enough?"""
70
 
        try:
71
 
            transport._clear_protocol_handlers()
72
 
            transport.register_transport_proto('foo')
73
 
            transport.register_lazy_transport('foo',
74
 
                                              'bzrlib.tests.test_transport',
75
 
                                              'TestTransport.SampleHandler')
76
 
            transport.register_transport_proto('bar')
77
 
            transport.register_lazy_transport('bar',
78
 
                                              'bzrlib.tests.test_transport',
79
 
                                              'TestTransport.SampleHandler')
80
 
            self.assertEqual([SampleHandler.__module__,
81
 
                              'bzrlib.transport.chroot',
82
 
                              'bzrlib.transport.pathfilter'],
83
 
                             transport._get_transport_modules())
84
 
        finally:
85
 
            transport._set_protocol_handlers(handlers)
 
70
        transport._clear_protocol_handlers()
 
71
        transport.register_transport_proto('foo')
 
72
        transport.register_lazy_transport('foo',
 
73
                                            'bzrlib.tests.test_transport',
 
74
                                            'TestTransport.SampleHandler')
 
75
        transport.register_transport_proto('bar')
 
76
        transport.register_lazy_transport('bar',
 
77
                                            'bzrlib.tests.test_transport',
 
78
                                            'TestTransport.SampleHandler')
 
79
        self.assertEqual([SampleHandler.__module__,
 
80
                            'bzrlib.transport.chroot',
 
81
                            'bzrlib.transport.pathfilter'],
 
82
                            transport._get_transport_modules())
86
83
 
87
84
    def test_transport_dependency(self):
88
85
        """Transport with missing dependency causes no error"""
89
86
        saved_handlers = transport._get_protocol_handlers()
 
87
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
90
88
        # don't pollute the current handlers
91
89
        transport._clear_protocol_handlers()
 
90
        transport.register_transport_proto('foo')
 
91
        transport.register_lazy_transport(
 
92
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
92
93
        try:
93
 
            transport.register_transport_proto('foo')
94
 
            transport.register_lazy_transport(
95
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
96
 
            try:
97
 
                transport.get_transport('foo://fooserver/foo')
98
 
            except errors.UnsupportedProtocol, e:
99
 
                e_str = str(e)
100
 
                self.assertEquals('Unsupported protocol'
101
 
                                  ' for url "foo://fooserver/foo":'
102
 
                                  ' Unable to import library "some_lib":'
103
 
                                  ' testing missing dependency', str(e))
104
 
            else:
105
 
                self.fail('Did not raise UnsupportedProtocol')
106
 
        finally:
107
 
            # restore original values
108
 
            transport._set_protocol_handlers(saved_handlers)
 
94
            transport.get_transport_from_url('foo://fooserver/foo')
 
95
        except errors.UnsupportedProtocol, e:
 
96
            e_str = str(e)
 
97
            self.assertEquals('Unsupported protocol'
 
98
                                ' for url "foo://fooserver/foo":'
 
99
                                ' Unable to import library "some_lib":'
 
100
                                ' testing missing dependency', str(e))
 
101
        else:
 
102
            self.fail('Did not raise UnsupportedProtocol')
109
103
 
110
104
    def test_transport_fallback(self):
111
105
        """Transport with missing dependency causes no error"""
112
106
        saved_handlers = transport._get_protocol_handlers()
113
 
        try:
114
 
            transport._clear_protocol_handlers()
115
 
            transport.register_transport_proto('foo')
116
 
            transport.register_lazy_transport(
117
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
118
 
            transport.register_lazy_transport(
119
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
120
 
            t = transport.get_transport('foo://fooserver/foo')
121
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
122
 
        finally:
123
 
            transport._set_protocol_handlers(saved_handlers)
 
107
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
108
        transport._clear_protocol_handlers()
 
109
        transport.register_transport_proto('foo')
 
110
        transport.register_lazy_transport(
 
111
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
112
        transport.register_lazy_transport(
 
113
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
114
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
115
        self.assertTrue(isinstance(t, BackupTransportHandler))
124
116
 
125
117
    def test_ssh_hints(self):
126
118
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
127
119
        try:
128
 
            transport.get_transport('ssh://fooserver/foo')
 
120
            transport.get_transport_from_url('ssh://fooserver/foo')
129
121
        except errors.UnsupportedProtocol, e:
130
122
            e_str = str(e)
131
123
            self.assertEquals('Unsupported protocol'
146
138
        self.assertRaises(errors.ReadError, a_file.read, 40)
147
139
        a_file.close()
148
140
 
149
 
    def test__combine_paths(self):
150
 
        t = transport.Transport('/')
151
 
        self.assertEqual('/home/sarah/project/foo',
152
 
                         t._combine_paths('/home/sarah', 'project/foo'))
153
 
        self.assertEqual('/etc',
154
 
                         t._combine_paths('/home/sarah', '../../etc'))
155
 
        self.assertEqual('/etc',
156
 
                         t._combine_paths('/home/sarah', '../../../etc'))
157
 
        self.assertEqual('/etc',
158
 
                         t._combine_paths('/home/sarah', '/etc'))
159
 
 
160
141
    def test_local_abspath_non_local_transport(self):
161
142
        # the base implementation should throw
162
143
        t = memory.MemoryTransport()
219
200
 
220
201
    def test_coalesce_fudge(self):
221
202
        self.check([(10, 30, [(0, 10), (20, 10)]),
222
 
                    (100, 10, [(0, 10),]),
 
203
                    (100, 10, [(0, 10)]),
223
204
                   ], [(10, 10), (30, 10), (100, 10)],
224
 
                   fudge=10
225
 
                  )
 
205
                   fudge=10)
 
206
 
226
207
    def test_coalesce_max_size(self):
227
208
        self.check([(10, 20, [(0, 10), (10, 10)]),
228
209
                    (30, 50, [(0, 50)]),
229
210
                    # If one range is above max_size, it gets its own coalesced
230
211
                    # offset
231
 
                    (100, 80, [(0, 80),]),],
 
212
                    (100, 80, [(0, 80)]),],
232
213
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
233
 
                   max_size=50
234
 
                  )
 
214
                   max_size=50)
235
215
 
236
216
    def test_coalesce_no_max_size(self):
237
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
217
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
238
218
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
239
219
                  )
240
220
 
241
221
    def test_coalesce_default_limit(self):
242
222
        # By default we use a 100MB max size.
243
 
        ten_mb = 10*1024*1024
244
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
223
        ten_mb = 10 * 1024 * 1024
 
224
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
245
225
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
246
226
                   [(i*ten_mb, ten_mb) for i in range(11)])
247
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
248
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
227
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
228
                   [(i * ten_mb, ten_mb) for i in range(11)],
249
229
                   max_size=1*1024*1024*1024)
250
230
 
251
231
 
256
236
        server.start_server()
257
237
        url = server.get_url()
258
238
        self.assertTrue(url in transport.transport_list_registry)
259
 
        t = transport.get_transport(url)
 
239
        t = transport.get_transport_from_url(url)
260
240
        del t
261
241
        server.stop_server()
262
242
        self.assertFalse(url in transport.transport_list_registry)
377
357
    def test_abspath(self):
378
358
        # The abspath is always relative to the chroot_url.
379
359
        server = chroot.ChrootServer(
380
 
            transport.get_transport('memory:///foo/bar/'))
 
360
            transport.get_transport_from_url('memory:///foo/bar/'))
381
361
        self.start_server(server)
382
 
        t = transport.get_transport(server.get_url())
 
362
        t = transport.get_transport_from_url(server.get_url())
383
363
        self.assertEqual(server.get_url(), t.abspath('/'))
384
364
 
385
365
        subdir_t = t.clone('subdir')
387
367
 
388
368
    def test_clone(self):
389
369
        server = chroot.ChrootServer(
390
 
            transport.get_transport('memory:///foo/bar/'))
 
370
            transport.get_transport_from_url('memory:///foo/bar/'))
391
371
        self.start_server(server)
392
 
        t = transport.get_transport(server.get_url())
 
372
        t = transport.get_transport_from_url(server.get_url())
393
373
        # relpath from root and root path are the same
394
374
        relpath_cloned = t.clone('foo')
395
375
        abspath_cloned = t.clone('/foo')
404
384
        This is so that it is not possible to escape a chroot by doing::
405
385
            url = chroot_transport.base
406
386
            parent_url = urlutils.join(url, '..')
407
 
            new_t = transport.get_transport(parent_url)
 
387
            new_t = transport.get_transport_from_url(parent_url)
408
388
        """
409
389
        server = chroot.ChrootServer(
410
 
            transport.get_transport('memory:///path/subpath'))
 
390
            transport.get_transport_from_url('memory:///path/subpath'))
411
391
        self.start_server(server)
412
 
        t = transport.get_transport(server.get_url())
413
 
        new_t = transport.get_transport(t.base)
 
392
        t = transport.get_transport_from_url(server.get_url())
 
393
        new_t = transport.get_transport_from_url(t.base)
414
394
        self.assertEqual(t.server, new_t.server)
415
395
        self.assertEqual(t.base, new_t.base)
416
396
 
421
401
        This is so that it is not possible to escape a chroot by doing::
422
402
            url = chroot_transport.base
423
403
            parent_url = urlutils.join(url, '..')
424
 
            new_t = transport.get_transport(parent_url)
 
404
            new_t = transport.get_transport_from_url(parent_url)
425
405
        """
426
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
406
        server = chroot.ChrootServer(
 
407
            transport.get_transport_from_url('memory:///path/'))
427
408
        self.start_server(server)
428
 
        t = transport.get_transport(server.get_url())
 
409
        t = transport.get_transport_from_url(server.get_url())
429
410
        self.assertRaises(
430
411
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
431
412
 
441
422
        backing_transport = memory.MemoryTransport()
442
423
        server = chroot.ChrootServer(backing_transport)
443
424
        server.start_server()
444
 
        try:
445
 
            self.assertTrue(server.scheme
446
 
                            in transport._get_protocol_handlers().keys())
447
 
        finally:
448
 
            server.stop_server()
 
425
        self.addCleanup(server.stop_server)
 
426
        self.assertTrue(server.scheme
 
427
                        in transport._get_protocol_handlers().keys())
449
428
 
450
429
    def test_stop_server(self):
451
430
        backing_transport = memory.MemoryTransport()
459
438
        backing_transport = memory.MemoryTransport()
460
439
        server = chroot.ChrootServer(backing_transport)
461
440
        server.start_server()
462
 
        try:
463
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
464
 
        finally:
465
 
            server.stop_server()
 
441
        self.addCleanup(server.stop_server)
 
442
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
466
443
 
467
444
 
468
445
class PathFilteringDecoratorTransportTest(tests.TestCase):
471
448
    def test_abspath(self):
472
449
        # The abspath is always relative to the base of the backing transport.
473
450
        server = pathfilter.PathFilteringServer(
474
 
            transport.get_transport('memory:///foo/bar/'),
 
451
            transport.get_transport_from_url('memory:///foo/bar/'),
475
452
            lambda x: x)
476
453
        server.start_server()
477
 
        t = transport.get_transport(server.get_url())
 
454
        t = transport.get_transport_from_url(server.get_url())
478
455
        self.assertEqual(server.get_url(), t.abspath('/'))
479
456
 
480
457
        subdir_t = t.clone('subdir')
483
460
 
484
461
    def make_pf_transport(self, filter_func=None):
485
462
        """Make a PathFilteringTransport backed by a MemoryTransport.
486
 
        
 
463
 
487
464
        :param filter_func: by default this will be a no-op function.  Use this
488
465
            parameter to override it."""
489
466
        if filter_func is None:
490
467
            filter_func = lambda x: x
491
468
        server = pathfilter.PathFilteringServer(
492
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
469
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
493
470
        server.start_server()
494
471
        self.addCleanup(server.stop_server)
495
 
        return transport.get_transport(server.get_url())
 
472
        return transport.get_transport_from_url(server.get_url())
496
473
 
497
474
    def test__filter(self):
498
475
        # _filter (with an identity func as filter_func) always returns
511
488
 
512
489
    def test_filter_invocation(self):
513
490
        filter_log = []
 
491
 
514
492
        def filter(path):
515
493
            filter_log.append(path)
516
494
            return path
541
519
        otherwise) the filtering by doing::
542
520
            url = filtered_transport.base
543
521
            parent_url = urlutils.join(url, '..')
544
 
            new_t = transport.get_transport(parent_url)
 
522
            new_t = transport.get_transport_from_url(parent_url)
545
523
        """
546
524
        t = self.make_pf_transport()
547
 
        new_t = transport.get_transport(t.base)
 
525
        new_t = transport.get_transport_from_url(t.base)
548
526
        self.assertEqual(t.server, new_t.server)
549
527
        self.assertEqual(t.base, new_t.base)
550
528
 
563
541
        # connect to '.' via http which is not listable
564
542
        server = HttpServer()
565
543
        self.start_server(server)
566
 
        t = transport.get_transport('readonly+' + server.get_url())
 
544
        t = transport.get_transport_from_url('readonly+' + server.get_url())
567
545
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
568
546
        self.assertEqual(False, t.listable())
569
547
        self.assertEqual(True, t.is_readonly())
602
580
        # the url should be decorated appropriately
603
581
        self.assertStartsWith(server.get_url(), 'fakenfs+')
604
582
        # and we should be able to get a transport for it
605
 
        t = transport.get_transport(server.get_url())
 
583
        t = transport.get_transport_from_url(server.get_url())
606
584
        # which must be a FakeNFSTransportDecorator instance.
607
585
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
608
586
 
685
663
        base_url = self._server.get_url()
686
664
        url = self._adjust_url(base_url, relpath)
687
665
        # try getting the transport via the regular interface:
688
 
        t = transport.get_transport(url)
 
666
        t = transport.get_transport_from_url(url)
689
667
        # vila--20070607 if the following are commented out the test suite
690
668
        # still pass. Is this really still needed or was it a forgotten
691
669
        # temporary fix ?
696
674
        return t
697
675
 
698
676
 
 
677
class TestTransportFromPath(tests.TestCaseInTempDir):
 
678
 
 
679
    def test_with_path(self):
 
680
        t = transport.get_transport_from_path(self.test_dir)
 
681
        self.assertIsInstance(t, local.LocalTransport)
 
682
        self.assertEquals(t.base.rstrip("/"),
 
683
            urlutils.local_path_to_url(self.test_dir))
 
684
 
 
685
    def test_with_url(self):
 
686
        t = transport.get_transport_from_path("file:")
 
687
        self.assertIsInstance(t, local.LocalTransport)
 
688
        self.assertEquals(t.base.rstrip("/"),
 
689
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
690
 
 
691
 
 
692
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
693
 
 
694
    def test_with_path(self):
 
695
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
696
            self.test_dir)
 
697
 
 
698
    def test_with_url(self):
 
699
        url = urlutils.local_path_to_url(self.test_dir)
 
700
        t = transport.get_transport_from_url(url)
 
701
        self.assertIsInstance(t, local.LocalTransport)
 
702
        self.assertEquals(t.base.rstrip("/"), url)
 
703
 
 
704
    def test_with_url_and_segment_parameters(self):
 
705
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
 
706
        t = transport.get_transport_from_url(url)
 
707
        self.assertIsInstance(t, local.LocalTransport)
 
708
        self.assertEquals(t.base.rstrip("/"), url)
 
709
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
 
710
            f.write("data")
 
711
        self.assertTrue(t.has("afile"))
 
712
 
 
713
 
699
714
class TestLocalTransports(tests.TestCase):
700
715
 
701
716
    def test_get_transport_from_abspath(self):
731
746
        We can't easily observe the external effect but we can at least see
732
747
        it's called.
733
748
        """
 
749
        sentinel = object()
 
750
        fdatasync = getattr(os, 'fdatasync', sentinel)
 
751
        if fdatasync is sentinel:
 
752
            raise tests.TestNotApplicable('fdatasync not supported')
734
753
        t = self.get_transport('.')
735
754
        calls = self.recordCalls(os, 'fdatasync')
736
755
        w = t.open_write_stream('out')
741
760
            self.assertEquals(f.read(), 'foo')
742
761
        self.assertEquals(len(calls), 1, calls)
743
762
 
 
763
    def test_missing_directory(self):
 
764
        t = self.get_transport('.')
 
765
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
 
766
 
744
767
 
745
768
class TestWin32LocalTransport(tests.TestCase):
746
769
 
763
786
    def test_parse_url(self):
764
787
        t = transport.ConnectedTransport(
765
788
            'http://simple.example.com/home/source')
766
 
        self.assertEquals(t._host, 'simple.example.com')
767
 
        self.assertEquals(t._port, None)
768
 
        self.assertEquals(t._path, '/home/source/')
769
 
        self.assertTrue(t._user is None)
770
 
        self.assertTrue(t._password is None)
 
789
        self.assertEquals(t._parsed_url.host, 'simple.example.com')
 
790
        self.assertEquals(t._parsed_url.port, None)
 
791
        self.assertEquals(t._parsed_url.path, '/home/source/')
 
792
        self.assertTrue(t._parsed_url.user is None)
 
793
        self.assertTrue(t._parsed_url.password is None)
771
794
 
772
795
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
773
796
 
774
797
    def test_parse_url_with_at_in_user(self):
775
798
        # Bug 228058
776
799
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
777
 
        self.assertEquals(t._user, 'user@host.com')
 
800
        self.assertEquals(t._parsed_url.user, 'user@host.com')
778
801
 
779
802
    def test_parse_quoted_url(self):
780
803
        t = transport.ConnectedTransport(
781
804
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
782
 
        self.assertEquals(t._host, 'exAmple.com')
783
 
        self.assertEquals(t._port, 2222)
784
 
        self.assertEquals(t._user, 'robey')
785
 
        self.assertEquals(t._password, 'h@t')
786
 
        self.assertEquals(t._path, '/path/')
 
805
        self.assertEquals(t._parsed_url.host, 'exAmple.com')
 
806
        self.assertEquals(t._parsed_url.port, 2222)
 
807
        self.assertEquals(t._parsed_url.user, 'robey')
 
808
        self.assertEquals(t._parsed_url.password, 'h@t')
 
809
        self.assertEquals(t._parsed_url.path, '/path/')
787
810
 
788
811
        # Base should not keep track of the password
789
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
812
        self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
790
813
 
791
814
    def test_parse_invalid_url(self):
792
815
        self.assertRaises(errors.InvalidURL,
796
819
    def test_relpath(self):
797
820
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
798
821
 
799
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
822
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
 
823
            'sub')
800
824
        self.assertRaises(errors.PathNotChild, t.relpath,
801
825
                          'http://user@host.com/abs/path/sub')
802
826
        self.assertRaises(errors.PathNotChild, t.relpath,
815
839
 
816
840
    def test_connection_sharing_propagate_credentials(self):
817
841
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
818
 
        self.assertEquals('user', t._user)
819
 
        self.assertEquals('host.com', t._host)
 
842
        self.assertEquals('user', t._parsed_url.user)
 
843
        self.assertEquals('host.com', t._parsed_url.host)
820
844
        self.assertIs(None, t._get_connection())
821
 
        self.assertIs(None, t._password)
 
845
        self.assertIs(None, t._parsed_url.password)
822
846
        c = t.clone('subdir')
823
847
        self.assertIs(None, c._get_connection())
824
 
        self.assertIs(None, t._password)
 
848
        self.assertIs(None, t._parsed_url.password)
825
849
 
826
850
        # Simulate the user entering a password
827
851
        password = 'secret'
846
870
 
847
871
    def test_reuse_same_transport(self):
848
872
        possible_transports = []
849
 
        t1 = transport.get_transport('http://foo/',
 
873
        t1 = transport.get_transport_from_url('http://foo/',
850
874
                                     possible_transports=possible_transports)
851
875
        self.assertEqual([t1], possible_transports)
852
 
        t2 = transport.get_transport('http://foo/',
 
876
        t2 = transport.get_transport_from_url('http://foo/',
853
877
                                     possible_transports=[t1])
854
878
        self.assertIs(t1, t2)
855
879
 
856
880
        # Also check that final '/' are handled correctly
857
 
        t3 = transport.get_transport('http://foo/path/')
858
 
        t4 = transport.get_transport('http://foo/path',
 
881
        t3 = transport.get_transport_from_url('http://foo/path/')
 
882
        t4 = transport.get_transport_from_url('http://foo/path',
859
883
                                     possible_transports=[t3])
860
884
        self.assertIs(t3, t4)
861
885
 
862
 
        t5 = transport.get_transport('http://foo/path')
863
 
        t6 = transport.get_transport('http://foo/path/',
 
886
        t5 = transport.get_transport_from_url('http://foo/path')
 
887
        t6 = transport.get_transport_from_url('http://foo/path/',
864
888
                                     possible_transports=[t5])
865
889
        self.assertIs(t5, t6)
866
890
 
867
891
    def test_don_t_reuse_different_transport(self):
868
 
        t1 = transport.get_transport('http://foo/path')
869
 
        t2 = transport.get_transport('http://bar/path',
 
892
        t1 = transport.get_transport_from_url('http://foo/path')
 
893
        t2 = transport.get_transport_from_url('http://bar/path',
870
894
                                     possible_transports=[t1])
871
895
        self.assertIsNot(t1, t2)
872
896
 
873
897
 
874
898
class TestTransportTrace(tests.TestCase):
875
899
 
876
 
    def test_get(self):
877
 
        t = transport.get_transport('trace+memory://')
878
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
900
    def test_decorator(self):
 
901
        t = transport.get_transport_from_url('trace+memory://')
 
902
        self.assertIsInstance(
 
903
            t, bzrlib.transport.trace.TransportTraceDecorator)
879
904
 
880
905
    def test_clone_preserves_activity(self):
881
 
        t = transport.get_transport('trace+memory://')
 
906
        t = transport.get_transport_from_url('trace+memory://')
882
907
        t2 = t.clone('.')
883
908
        self.assertTrue(t is not t2)
884
909
        self.assertTrue(t._activity is t2._activity)
888
913
    # still won't cause a test failure when the top level Transport API
889
914
    # changes; so there is little return doing that.
890
915
    def test_get(self):
891
 
        t = transport.get_transport('trace+memory:///')
 
916
        t = transport.get_transport_from_url('trace+memory:///')
892
917
        t.put_bytes('foo', 'barish')
893
918
        t.get('foo')
894
919
        expected_result = []
900
925
        self.assertEqual(expected_result, t._activity)
901
926
 
902
927
    def test_readv(self):
903
 
        t = transport.get_transport('trace+memory:///')
 
928
        t = transport.get_transport_from_url('trace+memory:///')
904
929
        t.put_bytes('foo', 'barish')
905
930
        list(t.readv('foo', [(0, 1), (3, 2)],
906
931
                     adjust_for_latency=True, upper_limit=6))
916
941
class TestSSHConnections(tests.TestCaseWithTransport):
917
942
 
918
943
    def test_bzr_connect_to_bzr_ssh(self):
919
 
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
944
        """get_transport of a bzr+ssh:// behaves correctly.
920
945
 
921
946
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
922
947
        """
938
963
        # SSH channel ourselves.  Surely this has already been implemented
939
964
        # elsewhere?
940
965
        started = []
 
966
 
941
967
        class StubSSHServer(stub_sftp.StubServer):
942
968
 
943
969
            test = self
949
975
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
950
976
 
951
977
                # XXX: horribly inefficient, not to mention ugly.
952
 
                # Start a thread for each of stdin/out/err, and relay bytes from
953
 
                # the subprocess to channel and vice versa.
 
978
                # Start a thread for each of stdin/out/err, and relay bytes
 
979
                # from the subprocess to channel and vice versa.
954
980
                def ferry_bytes(read, write, close):
955
981
                    while True:
956
982
                        bytes = read(1)
1024
1050
        result = http.unhtml_roughly(fake_html)
1025
1051
        self.assertEquals(len(result), 1000)
1026
1052
        self.assertStartsWith(result, " something!")
 
1053
 
 
1054
 
 
1055
class SomeDirectory(object):
 
1056
 
 
1057
    def look_up(self, name, url):
 
1058
        return "http://bar"
 
1059
 
 
1060
 
 
1061
class TestLocationToUrl(tests.TestCase):
 
1062
 
 
1063
    def get_base_location(self):
 
1064
        path = osutils.abspath('/foo/bar')
 
1065
        if path.startswith('/'):
 
1066
            url = 'file://%s' % (path,)
 
1067
        else:
 
1068
            # On Windows, abspaths start with the drive letter, so we have to
 
1069
            # add in the extra '/'
 
1070
            url = 'file:///%s' % (path,)
 
1071
        return path, url
 
1072
 
 
1073
    def test_regular_url(self):
 
1074
        self.assertEquals("file://foo", location_to_url("file://foo"))
 
1075
 
 
1076
    def test_directory(self):
 
1077
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1078
        self.addCleanup(directories.remove, "bar:")
 
1079
        self.assertEquals("http://bar", location_to_url("bar:"))
 
1080
 
 
1081
    def test_unicode_url(self):
 
1082
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1083
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1084
 
 
1085
    def test_unicode_path(self):
 
1086
        path, url = self.get_base_location()
 
1087
        location = path + "\xc3\xaf".decode("utf-8")
 
1088
        url += '%C3%AF'
 
1089
        self.assertEquals(url, location_to_url(location))
 
1090
 
 
1091
    def test_path(self):
 
1092
        path, url = self.get_base_location()
 
1093
        self.assertEquals(url, location_to_url(path))
 
1094
 
 
1095
    def test_relative_file_url(self):
 
1096
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
 
1097
            location_to_url("file:bar"))
 
1098
 
 
1099
    def test_absolute_file_url(self):
 
1100
        self.assertEquals("file:///bar", location_to_url("file:/bar"))