~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

Abbreviate pack_stat struct format to '>6L'

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
 
18
18
from cStringIO import StringIO
 
19
import os
19
20
import subprocess
20
21
import sys
21
22
import threading
27
28
    transport,
28
29
    urlutils,
29
30
    )
 
31
from bzrlib.directory_service import directories
30
32
from bzrlib.transport import (
31
33
    chroot,
32
34
    fakenfs,
33
35
    http,
34
36
    local,
 
37
    location_to_url,
35
38
    memory,
36
39
    pathfilter,
37
40
    readonly,
38
41
    )
 
42
import bzrlib.transport.trace
39
43
from bzrlib.tests import (
40
44
    features,
41
45
    test_server,
48
52
class TestTransport(tests.TestCase):
49
53
    """Test the non transport-concrete class functionality."""
50
54
 
51
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
52
 
    # of try/finally -- vila 20100205
53
 
 
54
55
    def test__get_set_protocol_handlers(self):
55
56
        handlers = transport._get_protocol_handlers()
56
 
        self.assertNotEqual([], handlers.keys( ))
57
 
        try:
58
 
            transport._clear_protocol_handlers()
59
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
60
 
        finally:
61
 
            transport._set_protocol_handlers(handlers)
 
57
        self.assertNotEqual([], handlers.keys())
 
58
        transport._clear_protocol_handlers()
 
59
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
60
        self.assertEqual([], transport._get_protocol_handlers().keys())
62
61
 
63
62
    def test_get_transport_modules(self):
64
63
        handlers = transport._get_protocol_handlers()
 
64
        self.addCleanup(transport._set_protocol_handlers, handlers)
65
65
        # don't pollute the current handlers
66
66
        transport._clear_protocol_handlers()
 
67
 
67
68
        class SampleHandler(object):
68
69
            """I exist, isnt that enough?"""
69
 
        try:
70
 
            transport._clear_protocol_handlers()
71
 
            transport.register_transport_proto('foo')
72
 
            transport.register_lazy_transport('foo',
73
 
                                              'bzrlib.tests.test_transport',
74
 
                                              'TestTransport.SampleHandler')
75
 
            transport.register_transport_proto('bar')
76
 
            transport.register_lazy_transport('bar',
77
 
                                              'bzrlib.tests.test_transport',
78
 
                                              'TestTransport.SampleHandler')
79
 
            self.assertEqual([SampleHandler.__module__,
80
 
                              'bzrlib.transport.chroot',
81
 
                              'bzrlib.transport.pathfilter'],
82
 
                             transport._get_transport_modules())
83
 
        finally:
84
 
            transport._set_protocol_handlers(handlers)
 
70
        transport._clear_protocol_handlers()
 
71
        transport.register_transport_proto('foo')
 
72
        transport.register_lazy_transport('foo',
 
73
                                            'bzrlib.tests.test_transport',
 
74
                                            'TestTransport.SampleHandler')
 
75
        transport.register_transport_proto('bar')
 
76
        transport.register_lazy_transport('bar',
 
77
                                            'bzrlib.tests.test_transport',
 
78
                                            'TestTransport.SampleHandler')
 
79
        self.assertEqual([SampleHandler.__module__,
 
80
                            'bzrlib.transport.chroot',
 
81
                            'bzrlib.transport.pathfilter'],
 
82
                            transport._get_transport_modules())
85
83
 
86
84
    def test_transport_dependency(self):
87
85
        """Transport with missing dependency causes no error"""
88
86
        saved_handlers = transport._get_protocol_handlers()
 
87
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
89
88
        # don't pollute the current handlers
90
89
        transport._clear_protocol_handlers()
 
90
        transport.register_transport_proto('foo')
 
91
        transport.register_lazy_transport(
 
92
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
91
93
        try:
92
 
            transport.register_transport_proto('foo')
93
 
            transport.register_lazy_transport(
94
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
95
 
            try:
96
 
                transport.get_transport('foo://fooserver/foo')
97
 
            except errors.UnsupportedProtocol, e:
98
 
                e_str = str(e)
99
 
                self.assertEquals('Unsupported protocol'
100
 
                                  ' for url "foo://fooserver/foo":'
101
 
                                  ' Unable to import library "some_lib":'
102
 
                                  ' testing missing dependency', str(e))
103
 
            else:
104
 
                self.fail('Did not raise UnsupportedProtocol')
105
 
        finally:
106
 
            # restore original values
107
 
            transport._set_protocol_handlers(saved_handlers)
 
94
            transport.get_transport_from_url('foo://fooserver/foo')
 
95
        except errors.UnsupportedProtocol, e:
 
96
            e_str = str(e)
 
97
            self.assertEquals('Unsupported protocol'
 
98
                                ' for url "foo://fooserver/foo":'
 
99
                                ' Unable to import library "some_lib":'
 
100
                                ' testing missing dependency', str(e))
 
101
        else:
 
102
            self.fail('Did not raise UnsupportedProtocol')
108
103
 
109
104
    def test_transport_fallback(self):
110
105
        """Transport with missing dependency causes no error"""
111
106
        saved_handlers = transport._get_protocol_handlers()
112
 
        try:
113
 
            transport._clear_protocol_handlers()
114
 
            transport.register_transport_proto('foo')
115
 
            transport.register_lazy_transport(
116
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
117
 
            transport.register_lazy_transport(
118
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
119
 
            t = transport.get_transport('foo://fooserver/foo')
120
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
121
 
        finally:
122
 
            transport._set_protocol_handlers(saved_handlers)
 
107
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
108
        transport._clear_protocol_handlers()
 
109
        transport.register_transport_proto('foo')
 
110
        transport.register_lazy_transport(
 
111
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
112
        transport.register_lazy_transport(
 
113
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
114
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
115
        self.assertTrue(isinstance(t, BackupTransportHandler))
123
116
 
124
117
    def test_ssh_hints(self):
125
118
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
126
119
        try:
127
 
            transport.get_transport('ssh://fooserver/foo')
 
120
            transport.get_transport_from_url('ssh://fooserver/foo')
128
121
        except errors.UnsupportedProtocol, e:
129
122
            e_str = str(e)
130
123
            self.assertEquals('Unsupported protocol'
145
138
        self.assertRaises(errors.ReadError, a_file.read, 40)
146
139
        a_file.close()
147
140
 
148
 
    def test__combine_paths(self):
149
 
        t = transport.Transport('/')
150
 
        self.assertEqual('/home/sarah/project/foo',
151
 
                         t._combine_paths('/home/sarah', 'project/foo'))
152
 
        self.assertEqual('/etc',
153
 
                         t._combine_paths('/home/sarah', '../../etc'))
154
 
        self.assertEqual('/etc',
155
 
                         t._combine_paths('/home/sarah', '../../../etc'))
156
 
        self.assertEqual('/etc',
157
 
                         t._combine_paths('/home/sarah', '/etc'))
158
 
 
159
141
    def test_local_abspath_non_local_transport(self):
160
142
        # the base implementation should throw
161
143
        t = memory.MemoryTransport()
218
200
 
219
201
    def test_coalesce_fudge(self):
220
202
        self.check([(10, 30, [(0, 10), (20, 10)]),
221
 
                    (100, 10, [(0, 10),]),
 
203
                    (100, 10, [(0, 10)]),
222
204
                   ], [(10, 10), (30, 10), (100, 10)],
223
 
                   fudge=10
224
 
                  )
 
205
                   fudge=10)
 
206
 
225
207
    def test_coalesce_max_size(self):
226
208
        self.check([(10, 20, [(0, 10), (10, 10)]),
227
209
                    (30, 50, [(0, 50)]),
228
210
                    # If one range is above max_size, it gets its own coalesced
229
211
                    # offset
230
 
                    (100, 80, [(0, 80),]),],
 
212
                    (100, 80, [(0, 80)]),],
231
213
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
232
 
                   max_size=50
233
 
                  )
 
214
                   max_size=50)
234
215
 
235
216
    def test_coalesce_no_max_size(self):
236
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
217
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
237
218
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
238
219
                  )
239
220
 
240
221
    def test_coalesce_default_limit(self):
241
222
        # By default we use a 100MB max size.
242
 
        ten_mb = 10*1024*1024
243
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
223
        ten_mb = 10 * 1024 * 1024
 
224
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
244
225
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
245
226
                   [(i*ten_mb, ten_mb) for i in range(11)])
246
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
247
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
227
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
228
                   [(i * ten_mb, ten_mb) for i in range(11)],
248
229
                   max_size=1*1024*1024*1024)
249
230
 
250
231
 
255
236
        server.start_server()
256
237
        url = server.get_url()
257
238
        self.assertTrue(url in transport.transport_list_registry)
258
 
        t = transport.get_transport(url)
 
239
        t = transport.get_transport_from_url(url)
259
240
        del t
260
241
        server.stop_server()
261
242
        self.assertFalse(url in transport.transport_list_registry)
376
357
    def test_abspath(self):
377
358
        # The abspath is always relative to the chroot_url.
378
359
        server = chroot.ChrootServer(
379
 
            transport.get_transport('memory:///foo/bar/'))
 
360
            transport.get_transport_from_url('memory:///foo/bar/'))
380
361
        self.start_server(server)
381
 
        t = transport.get_transport(server.get_url())
 
362
        t = transport.get_transport_from_url(server.get_url())
382
363
        self.assertEqual(server.get_url(), t.abspath('/'))
383
364
 
384
365
        subdir_t = t.clone('subdir')
386
367
 
387
368
    def test_clone(self):
388
369
        server = chroot.ChrootServer(
389
 
            transport.get_transport('memory:///foo/bar/'))
 
370
            transport.get_transport_from_url('memory:///foo/bar/'))
390
371
        self.start_server(server)
391
 
        t = transport.get_transport(server.get_url())
 
372
        t = transport.get_transport_from_url(server.get_url())
392
373
        # relpath from root and root path are the same
393
374
        relpath_cloned = t.clone('foo')
394
375
        abspath_cloned = t.clone('/foo')
403
384
        This is so that it is not possible to escape a chroot by doing::
404
385
            url = chroot_transport.base
405
386
            parent_url = urlutils.join(url, '..')
406
 
            new_t = transport.get_transport(parent_url)
 
387
            new_t = transport.get_transport_from_url(parent_url)
407
388
        """
408
389
        server = chroot.ChrootServer(
409
 
            transport.get_transport('memory:///path/subpath'))
 
390
            transport.get_transport_from_url('memory:///path/subpath'))
410
391
        self.start_server(server)
411
 
        t = transport.get_transport(server.get_url())
412
 
        new_t = transport.get_transport(t.base)
 
392
        t = transport.get_transport_from_url(server.get_url())
 
393
        new_t = transport.get_transport_from_url(t.base)
413
394
        self.assertEqual(t.server, new_t.server)
414
395
        self.assertEqual(t.base, new_t.base)
415
396
 
420
401
        This is so that it is not possible to escape a chroot by doing::
421
402
            url = chroot_transport.base
422
403
            parent_url = urlutils.join(url, '..')
423
 
            new_t = transport.get_transport(parent_url)
 
404
            new_t = transport.get_transport_from_url(parent_url)
424
405
        """
425
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
406
        server = chroot.ChrootServer(
 
407
            transport.get_transport_from_url('memory:///path/'))
426
408
        self.start_server(server)
427
 
        t = transport.get_transport(server.get_url())
 
409
        t = transport.get_transport_from_url(server.get_url())
428
410
        self.assertRaises(
429
411
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
430
412
 
440
422
        backing_transport = memory.MemoryTransport()
441
423
        server = chroot.ChrootServer(backing_transport)
442
424
        server.start_server()
443
 
        try:
444
 
            self.assertTrue(server.scheme
445
 
                            in transport._get_protocol_handlers().keys())
446
 
        finally:
447
 
            server.stop_server()
 
425
        self.addCleanup(server.stop_server)
 
426
        self.assertTrue(server.scheme
 
427
                        in transport._get_protocol_handlers().keys())
448
428
 
449
429
    def test_stop_server(self):
450
430
        backing_transport = memory.MemoryTransport()
458
438
        backing_transport = memory.MemoryTransport()
459
439
        server = chroot.ChrootServer(backing_transport)
460
440
        server.start_server()
461
 
        try:
462
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
463
 
        finally:
464
 
            server.stop_server()
 
441
        self.addCleanup(server.stop_server)
 
442
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
465
443
 
466
444
 
467
445
class PathFilteringDecoratorTransportTest(tests.TestCase):
470
448
    def test_abspath(self):
471
449
        # The abspath is always relative to the base of the backing transport.
472
450
        server = pathfilter.PathFilteringServer(
473
 
            transport.get_transport('memory:///foo/bar/'),
 
451
            transport.get_transport_from_url('memory:///foo/bar/'),
474
452
            lambda x: x)
475
453
        server.start_server()
476
 
        t = transport.get_transport(server.get_url())
 
454
        t = transport.get_transport_from_url(server.get_url())
477
455
        self.assertEqual(server.get_url(), t.abspath('/'))
478
456
 
479
457
        subdir_t = t.clone('subdir')
482
460
 
483
461
    def make_pf_transport(self, filter_func=None):
484
462
        """Make a PathFilteringTransport backed by a MemoryTransport.
485
 
        
 
463
 
486
464
        :param filter_func: by default this will be a no-op function.  Use this
487
465
            parameter to override it."""
488
466
        if filter_func is None:
489
467
            filter_func = lambda x: x
490
468
        server = pathfilter.PathFilteringServer(
491
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
469
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
492
470
        server.start_server()
493
471
        self.addCleanup(server.stop_server)
494
 
        return transport.get_transport(server.get_url())
 
472
        return transport.get_transport_from_url(server.get_url())
495
473
 
496
474
    def test__filter(self):
497
475
        # _filter (with an identity func as filter_func) always returns
510
488
 
511
489
    def test_filter_invocation(self):
512
490
        filter_log = []
 
491
 
513
492
        def filter(path):
514
493
            filter_log.append(path)
515
494
            return path
540
519
        otherwise) the filtering by doing::
541
520
            url = filtered_transport.base
542
521
            parent_url = urlutils.join(url, '..')
543
 
            new_t = transport.get_transport(parent_url)
 
522
            new_t = transport.get_transport_from_url(parent_url)
544
523
        """
545
524
        t = self.make_pf_transport()
546
 
        new_t = transport.get_transport(t.base)
 
525
        new_t = transport.get_transport_from_url(t.base)
547
526
        self.assertEqual(t.server, new_t.server)
548
527
        self.assertEqual(t.base, new_t.base)
549
528
 
562
541
        # connect to '.' via http which is not listable
563
542
        server = HttpServer()
564
543
        self.start_server(server)
565
 
        t = transport.get_transport('readonly+' + server.get_url())
 
544
        t = transport.get_transport_from_url('readonly+' + server.get_url())
566
545
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
567
546
        self.assertEqual(False, t.listable())
568
547
        self.assertEqual(True, t.is_readonly())
601
580
        # the url should be decorated appropriately
602
581
        self.assertStartsWith(server.get_url(), 'fakenfs+')
603
582
        # and we should be able to get a transport for it
604
 
        t = transport.get_transport(server.get_url())
 
583
        t = transport.get_transport_from_url(server.get_url())
605
584
        # which must be a FakeNFSTransportDecorator instance.
606
585
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
607
586
 
684
663
        base_url = self._server.get_url()
685
664
        url = self._adjust_url(base_url, relpath)
686
665
        # try getting the transport via the regular interface:
687
 
        t = transport.get_transport(url)
 
666
        t = transport.get_transport_from_url(url)
688
667
        # vila--20070607 if the following are commented out the test suite
689
668
        # still pass. Is this really still needed or was it a forgotten
690
669
        # temporary fix ?
695
674
        return t
696
675
 
697
676
 
 
677
class TestTransportFromPath(tests.TestCaseInTempDir):
 
678
 
 
679
    def test_with_path(self):
 
680
        t = transport.get_transport_from_path(self.test_dir)
 
681
        self.assertIsInstance(t, local.LocalTransport)
 
682
        self.assertEquals(t.base.rstrip("/"),
 
683
            urlutils.local_path_to_url(self.test_dir))
 
684
 
 
685
    def test_with_url(self):
 
686
        t = transport.get_transport_from_path("file:")
 
687
        self.assertIsInstance(t, local.LocalTransport)
 
688
        self.assertEquals(t.base.rstrip("/"),
 
689
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
690
 
 
691
 
 
692
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
693
 
 
694
    def test_with_path(self):
 
695
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
696
            self.test_dir)
 
697
 
 
698
    def test_with_url(self):
 
699
        url = urlutils.local_path_to_url(self.test_dir)
 
700
        t = transport.get_transport_from_url(url)
 
701
        self.assertIsInstance(t, local.LocalTransport)
 
702
        self.assertEquals(t.base.rstrip("/"), url)
 
703
 
 
704
    def test_with_url_and_segment_parameters(self):
 
705
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
 
706
        t = transport.get_transport_from_url(url)
 
707
        self.assertIsInstance(t, local.LocalTransport)
 
708
        self.assertEquals(t.base.rstrip("/"), url)
 
709
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
 
710
            f.write("data")
 
711
        self.assertTrue(t.has("afile"))
 
712
 
 
713
 
698
714
class TestLocalTransports(tests.TestCase):
699
715
 
700
716
    def test_get_transport_from_abspath(self):
722
738
        self.assertEquals(t.local_abspath(''), here)
723
739
 
724
740
 
 
741
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
 
742
 
 
743
    def test_local_fdatasync_calls_fdatasync(self):
 
744
        """Check fdatasync on a stream tries to flush the data to the OS.
 
745
        
 
746
        We can't easily observe the external effect but we can at least see
 
747
        it's called.
 
748
        """
 
749
        sentinel = object()
 
750
        fdatasync = getattr(os, 'fdatasync', sentinel)
 
751
        if fdatasync is sentinel:
 
752
            raise tests.TestNotApplicable('fdatasync not supported')
 
753
        t = self.get_transport('.')
 
754
        calls = self.recordCalls(os, 'fdatasync')
 
755
        w = t.open_write_stream('out')
 
756
        w.write('foo')
 
757
        w.fdatasync()
 
758
        with open('out', 'rb') as f:
 
759
            # Should have been flushed.
 
760
            self.assertEquals(f.read(), 'foo')
 
761
        self.assertEquals(len(calls), 1, calls)
 
762
 
 
763
    def test_missing_directory(self):
 
764
        t = self.get_transport('.')
 
765
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
 
766
 
 
767
 
725
768
class TestWin32LocalTransport(tests.TestCase):
726
769
 
727
770
    def test_unc_clone_to_root(self):
743
786
    def test_parse_url(self):
744
787
        t = transport.ConnectedTransport(
745
788
            'http://simple.example.com/home/source')
746
 
        self.assertEquals(t._host, 'simple.example.com')
747
 
        self.assertEquals(t._port, None)
748
 
        self.assertEquals(t._path, '/home/source/')
749
 
        self.assertTrue(t._user is None)
750
 
        self.assertTrue(t._password is None)
 
789
        self.assertEquals(t._parsed_url.host, 'simple.example.com')
 
790
        self.assertEquals(t._parsed_url.port, None)
 
791
        self.assertEquals(t._parsed_url.path, '/home/source/')
 
792
        self.assertTrue(t._parsed_url.user is None)
 
793
        self.assertTrue(t._parsed_url.password is None)
751
794
 
752
795
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
753
796
 
754
797
    def test_parse_url_with_at_in_user(self):
755
798
        # Bug 228058
756
799
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
757
 
        self.assertEquals(t._user, 'user@host.com')
 
800
        self.assertEquals(t._parsed_url.user, 'user@host.com')
758
801
 
759
802
    def test_parse_quoted_url(self):
760
803
        t = transport.ConnectedTransport(
761
804
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
762
 
        self.assertEquals(t._host, 'exAmple.com')
763
 
        self.assertEquals(t._port, 2222)
764
 
        self.assertEquals(t._user, 'robey')
765
 
        self.assertEquals(t._password, 'h@t')
766
 
        self.assertEquals(t._path, '/path/')
 
805
        self.assertEquals(t._parsed_url.host, 'exAmple.com')
 
806
        self.assertEquals(t._parsed_url.port, 2222)
 
807
        self.assertEquals(t._parsed_url.user, 'robey')
 
808
        self.assertEquals(t._parsed_url.password, 'h@t')
 
809
        self.assertEquals(t._parsed_url.path, '/path/')
767
810
 
768
811
        # Base should not keep track of the password
769
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
812
        self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
770
813
 
771
814
    def test_parse_invalid_url(self):
772
815
        self.assertRaises(errors.InvalidURL,
776
819
    def test_relpath(self):
777
820
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
778
821
 
779
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
822
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
 
823
            'sub')
780
824
        self.assertRaises(errors.PathNotChild, t.relpath,
781
825
                          'http://user@host.com/abs/path/sub')
782
826
        self.assertRaises(errors.PathNotChild, t.relpath,
795
839
 
796
840
    def test_connection_sharing_propagate_credentials(self):
797
841
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
798
 
        self.assertEquals('user', t._user)
799
 
        self.assertEquals('host.com', t._host)
 
842
        self.assertEquals('user', t._parsed_url.user)
 
843
        self.assertEquals('host.com', t._parsed_url.host)
800
844
        self.assertIs(None, t._get_connection())
801
 
        self.assertIs(None, t._password)
 
845
        self.assertIs(None, t._parsed_url.password)
802
846
        c = t.clone('subdir')
803
847
        self.assertIs(None, c._get_connection())
804
 
        self.assertIs(None, t._password)
 
848
        self.assertIs(None, t._parsed_url.password)
805
849
 
806
850
        # Simulate the user entering a password
807
851
        password = 'secret'
826
870
 
827
871
    def test_reuse_same_transport(self):
828
872
        possible_transports = []
829
 
        t1 = transport.get_transport('http://foo/',
 
873
        t1 = transport.get_transport_from_url('http://foo/',
830
874
                                     possible_transports=possible_transports)
831
875
        self.assertEqual([t1], possible_transports)
832
 
        t2 = transport.get_transport('http://foo/',
 
876
        t2 = transport.get_transport_from_url('http://foo/',
833
877
                                     possible_transports=[t1])
834
878
        self.assertIs(t1, t2)
835
879
 
836
880
        # Also check that final '/' are handled correctly
837
 
        t3 = transport.get_transport('http://foo/path/')
838
 
        t4 = transport.get_transport('http://foo/path',
 
881
        t3 = transport.get_transport_from_url('http://foo/path/')
 
882
        t4 = transport.get_transport_from_url('http://foo/path',
839
883
                                     possible_transports=[t3])
840
884
        self.assertIs(t3, t4)
841
885
 
842
 
        t5 = transport.get_transport('http://foo/path')
843
 
        t6 = transport.get_transport('http://foo/path/',
 
886
        t5 = transport.get_transport_from_url('http://foo/path')
 
887
        t6 = transport.get_transport_from_url('http://foo/path/',
844
888
                                     possible_transports=[t5])
845
889
        self.assertIs(t5, t6)
846
890
 
847
891
    def test_don_t_reuse_different_transport(self):
848
 
        t1 = transport.get_transport('http://foo/path')
849
 
        t2 = transport.get_transport('http://bar/path',
 
892
        t1 = transport.get_transport_from_url('http://foo/path')
 
893
        t2 = transport.get_transport_from_url('http://bar/path',
850
894
                                     possible_transports=[t1])
851
895
        self.assertIsNot(t1, t2)
852
896
 
853
897
 
854
898
class TestTransportTrace(tests.TestCase):
855
899
 
856
 
    def test_get(self):
857
 
        t = transport.get_transport('trace+memory://')
858
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
900
    def test_decorator(self):
 
901
        t = transport.get_transport_from_url('trace+memory://')
 
902
        self.assertIsInstance(
 
903
            t, bzrlib.transport.trace.TransportTraceDecorator)
859
904
 
860
905
    def test_clone_preserves_activity(self):
861
 
        t = transport.get_transport('trace+memory://')
 
906
        t = transport.get_transport_from_url('trace+memory://')
862
907
        t2 = t.clone('.')
863
908
        self.assertTrue(t is not t2)
864
909
        self.assertTrue(t._activity is t2._activity)
868
913
    # still won't cause a test failure when the top level Transport API
869
914
    # changes; so there is little return doing that.
870
915
    def test_get(self):
871
 
        t = transport.get_transport('trace+memory:///')
 
916
        t = transport.get_transport_from_url('trace+memory:///')
872
917
        t.put_bytes('foo', 'barish')
873
918
        t.get('foo')
874
919
        expected_result = []
880
925
        self.assertEqual(expected_result, t._activity)
881
926
 
882
927
    def test_readv(self):
883
 
        t = transport.get_transport('trace+memory:///')
 
928
        t = transport.get_transport_from_url('trace+memory:///')
884
929
        t.put_bytes('foo', 'barish')
885
930
        list(t.readv('foo', [(0, 1), (3, 2)],
886
931
                     adjust_for_latency=True, upper_limit=6))
896
941
class TestSSHConnections(tests.TestCaseWithTransport):
897
942
 
898
943
    def test_bzr_connect_to_bzr_ssh(self):
899
 
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
944
        """get_transport of a bzr+ssh:// behaves correctly.
900
945
 
901
946
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
902
947
        """
918
963
        # SSH channel ourselves.  Surely this has already been implemented
919
964
        # elsewhere?
920
965
        started = []
 
966
 
921
967
        class StubSSHServer(stub_sftp.StubServer):
922
968
 
923
969
            test = self
929
975
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
930
976
 
931
977
                # XXX: horribly inefficient, not to mention ugly.
932
 
                # Start a thread for each of stdin/out/err, and relay bytes from
933
 
                # the subprocess to channel and vice versa.
 
978
                # Start a thread for each of stdin/out/err, and relay bytes
 
979
                # from the subprocess to channel and vice versa.
934
980
                def ferry_bytes(read, write, close):
935
981
                    while True:
936
982
                        bytes = read(1)
1004
1050
        result = http.unhtml_roughly(fake_html)
1005
1051
        self.assertEquals(len(result), 1000)
1006
1052
        self.assertStartsWith(result, " something!")
 
1053
 
 
1054
 
 
1055
class SomeDirectory(object):
 
1056
 
 
1057
    def look_up(self, name, url):
 
1058
        return "http://bar"
 
1059
 
 
1060
 
 
1061
class TestLocationToUrl(tests.TestCase):
 
1062
 
 
1063
    def get_base_location(self):
 
1064
        path = osutils.abspath('/foo/bar')
 
1065
        if path.startswith('/'):
 
1066
            url = 'file://%s' % (path,)
 
1067
        else:
 
1068
            # On Windows, abspaths start with the drive letter, so we have to
 
1069
            # add in the extra '/'
 
1070
            url = 'file:///%s' % (path,)
 
1071
        return path, url
 
1072
 
 
1073
    def test_regular_url(self):
 
1074
        self.assertEquals("file://foo", location_to_url("file://foo"))
 
1075
 
 
1076
    def test_directory(self):
 
1077
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1078
        self.addCleanup(directories.remove, "bar:")
 
1079
        self.assertEquals("http://bar", location_to_url("bar:"))
 
1080
 
 
1081
    def test_unicode_url(self):
 
1082
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1083
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1084
 
 
1085
    def test_unicode_path(self):
 
1086
        path, url = self.get_base_location()
 
1087
        location = path + "\xc3\xaf".decode("utf-8")
 
1088
        url += '%C3%AF'
 
1089
        self.assertEquals(url, location_to_url(location))
 
1090
 
 
1091
    def test_path(self):
 
1092
        path, url = self.get_base_location()
 
1093
        self.assertEquals(url, location_to_url(path))
 
1094
 
 
1095
    def test_relative_file_url(self):
 
1096
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
 
1097
            location_to_url("file:bar"))
 
1098
 
 
1099
    def test_absolute_file_url(self):
 
1100
        self.assertEquals("file:///bar", location_to_url("file:/bar"))