~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Martin Packman
  • Date: 2012-09-05 20:22:17 UTC
  • mfrom: (6437.63.6 2.5)
  • mto: This revision was merged to the branch mainline in revision 6558.
  • Revision ID: martin.packman@canonical.com-20120905202217-79io6livc1q0p66u
Merge 2.5 into bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
 
18
18
from cStringIO import StringIO
 
19
import errno
19
20
import os
20
21
import subprocess
21
22
import sys
28
29
    transport,
29
30
    urlutils,
30
31
    )
 
32
from bzrlib.directory_service import directories
31
33
from bzrlib.transport import (
32
34
    chroot,
33
35
    fakenfs,
34
36
    http,
35
37
    local,
 
38
    location_to_url,
36
39
    memory,
37
40
    pathfilter,
38
41
    readonly,
39
42
    )
 
43
import bzrlib.transport.trace
40
44
from bzrlib.tests import (
41
45
    features,
42
46
    test_server,
49
53
class TestTransport(tests.TestCase):
50
54
    """Test the non transport-concrete class functionality."""
51
55
 
52
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
53
 
    # of try/finally -- vila 20100205
54
 
 
55
56
    def test__get_set_protocol_handlers(self):
56
57
        handlers = transport._get_protocol_handlers()
57
 
        self.assertNotEqual([], handlers.keys( ))
58
 
        try:
59
 
            transport._clear_protocol_handlers()
60
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
61
 
        finally:
62
 
            transport._set_protocol_handlers(handlers)
 
58
        self.assertNotEqual([], handlers.keys())
 
59
        transport._clear_protocol_handlers()
 
60
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
61
        self.assertEqual([], transport._get_protocol_handlers().keys())
63
62
 
64
63
    def test_get_transport_modules(self):
65
64
        handlers = transport._get_protocol_handlers()
 
65
        self.addCleanup(transport._set_protocol_handlers, handlers)
66
66
        # don't pollute the current handlers
67
67
        transport._clear_protocol_handlers()
 
68
 
68
69
        class SampleHandler(object):
69
70
            """I exist, isnt that enough?"""
70
 
        try:
71
 
            transport._clear_protocol_handlers()
72
 
            transport.register_transport_proto('foo')
73
 
            transport.register_lazy_transport('foo',
74
 
                                              'bzrlib.tests.test_transport',
75
 
                                              'TestTransport.SampleHandler')
76
 
            transport.register_transport_proto('bar')
77
 
            transport.register_lazy_transport('bar',
78
 
                                              'bzrlib.tests.test_transport',
79
 
                                              'TestTransport.SampleHandler')
80
 
            self.assertEqual([SampleHandler.__module__,
81
 
                              'bzrlib.transport.chroot',
82
 
                              'bzrlib.transport.pathfilter'],
83
 
                             transport._get_transport_modules())
84
 
        finally:
85
 
            transport._set_protocol_handlers(handlers)
 
71
        transport._clear_protocol_handlers()
 
72
        transport.register_transport_proto('foo')
 
73
        transport.register_lazy_transport('foo',
 
74
                                            'bzrlib.tests.test_transport',
 
75
                                            'TestTransport.SampleHandler')
 
76
        transport.register_transport_proto('bar')
 
77
        transport.register_lazy_transport('bar',
 
78
                                            'bzrlib.tests.test_transport',
 
79
                                            'TestTransport.SampleHandler')
 
80
        self.assertEqual([SampleHandler.__module__,
 
81
                            'bzrlib.transport.chroot',
 
82
                            'bzrlib.transport.pathfilter'],
 
83
                            transport._get_transport_modules())
86
84
 
87
85
    def test_transport_dependency(self):
88
86
        """Transport with missing dependency causes no error"""
89
87
        saved_handlers = transport._get_protocol_handlers()
 
88
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
90
89
        # don't pollute the current handlers
91
90
        transport._clear_protocol_handlers()
 
91
        transport.register_transport_proto('foo')
 
92
        transport.register_lazy_transport(
 
93
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
92
94
        try:
93
 
            transport.register_transport_proto('foo')
94
 
            transport.register_lazy_transport(
95
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
96
 
            try:
97
 
                transport.get_transport('foo://fooserver/foo')
98
 
            except errors.UnsupportedProtocol, e:
99
 
                e_str = str(e)
100
 
                self.assertEquals('Unsupported protocol'
101
 
                                  ' for url "foo://fooserver/foo":'
102
 
                                  ' Unable to import library "some_lib":'
103
 
                                  ' testing missing dependency', str(e))
104
 
            else:
105
 
                self.fail('Did not raise UnsupportedProtocol')
106
 
        finally:
107
 
            # restore original values
108
 
            transport._set_protocol_handlers(saved_handlers)
 
95
            transport.get_transport_from_url('foo://fooserver/foo')
 
96
        except errors.UnsupportedProtocol, e:
 
97
            e_str = str(e)
 
98
            self.assertEquals('Unsupported protocol'
 
99
                                ' for url "foo://fooserver/foo":'
 
100
                                ' Unable to import library "some_lib":'
 
101
                                ' testing missing dependency', str(e))
 
102
        else:
 
103
            self.fail('Did not raise UnsupportedProtocol')
109
104
 
110
105
    def test_transport_fallback(self):
111
106
        """Transport with missing dependency causes no error"""
112
107
        saved_handlers = transport._get_protocol_handlers()
113
 
        try:
114
 
            transport._clear_protocol_handlers()
115
 
            transport.register_transport_proto('foo')
116
 
            transport.register_lazy_transport(
117
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
118
 
            transport.register_lazy_transport(
119
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
120
 
            t = transport.get_transport('foo://fooserver/foo')
121
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
122
 
        finally:
123
 
            transport._set_protocol_handlers(saved_handlers)
 
108
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
109
        transport._clear_protocol_handlers()
 
110
        transport.register_transport_proto('foo')
 
111
        transport.register_lazy_transport(
 
112
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
113
        transport.register_lazy_transport(
 
114
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
115
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
116
        self.assertTrue(isinstance(t, BackupTransportHandler))
124
117
 
125
118
    def test_ssh_hints(self):
126
119
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
127
120
        try:
128
 
            transport.get_transport('ssh://fooserver/foo')
 
121
            transport.get_transport_from_url('ssh://fooserver/foo')
129
122
        except errors.UnsupportedProtocol, e:
130
123
            e_str = str(e)
131
124
            self.assertEquals('Unsupported protocol'
146
139
        self.assertRaises(errors.ReadError, a_file.read, 40)
147
140
        a_file.close()
148
141
 
149
 
    def test__combine_paths(self):
150
 
        t = transport.Transport('/')
151
 
        self.assertEqual('/home/sarah/project/foo',
152
 
                         t._combine_paths('/home/sarah', 'project/foo'))
153
 
        self.assertEqual('/etc',
154
 
                         t._combine_paths('/home/sarah', '../../etc'))
155
 
        self.assertEqual('/etc',
156
 
                         t._combine_paths('/home/sarah', '../../../etc'))
157
 
        self.assertEqual('/etc',
158
 
                         t._combine_paths('/home/sarah', '/etc'))
159
 
 
160
142
    def test_local_abspath_non_local_transport(self):
161
143
        # the base implementation should throw
162
144
        t = memory.MemoryTransport()
219
201
 
220
202
    def test_coalesce_fudge(self):
221
203
        self.check([(10, 30, [(0, 10), (20, 10)]),
222
 
                    (100, 10, [(0, 10),]),
 
204
                    (100, 10, [(0, 10)]),
223
205
                   ], [(10, 10), (30, 10), (100, 10)],
224
 
                   fudge=10
225
 
                  )
 
206
                   fudge=10)
 
207
 
226
208
    def test_coalesce_max_size(self):
227
209
        self.check([(10, 20, [(0, 10), (10, 10)]),
228
210
                    (30, 50, [(0, 50)]),
229
211
                    # If one range is above max_size, it gets its own coalesced
230
212
                    # offset
231
 
                    (100, 80, [(0, 80),]),],
 
213
                    (100, 80, [(0, 80)]),],
232
214
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
233
 
                   max_size=50
234
 
                  )
 
215
                   max_size=50)
235
216
 
236
217
    def test_coalesce_no_max_size(self):
237
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
218
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
238
219
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
239
220
                  )
240
221
 
241
222
    def test_coalesce_default_limit(self):
242
223
        # By default we use a 100MB max size.
243
 
        ten_mb = 10*1024*1024
244
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
224
        ten_mb = 10 * 1024 * 1024
 
225
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
245
226
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
246
227
                   [(i*ten_mb, ten_mb) for i in range(11)])
247
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
248
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
228
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
229
                   [(i * ten_mb, ten_mb) for i in range(11)],
249
230
                   max_size=1*1024*1024*1024)
250
231
 
251
232
 
256
237
        server.start_server()
257
238
        url = server.get_url()
258
239
        self.assertTrue(url in transport.transport_list_registry)
259
 
        t = transport.get_transport(url)
 
240
        t = transport.get_transport_from_url(url)
260
241
        del t
261
242
        server.stop_server()
262
243
        self.assertFalse(url in transport.transport_list_registry)
377
358
    def test_abspath(self):
378
359
        # The abspath is always relative to the chroot_url.
379
360
        server = chroot.ChrootServer(
380
 
            transport.get_transport('memory:///foo/bar/'))
 
361
            transport.get_transport_from_url('memory:///foo/bar/'))
381
362
        self.start_server(server)
382
 
        t = transport.get_transport(server.get_url())
 
363
        t = transport.get_transport_from_url(server.get_url())
383
364
        self.assertEqual(server.get_url(), t.abspath('/'))
384
365
 
385
366
        subdir_t = t.clone('subdir')
387
368
 
388
369
    def test_clone(self):
389
370
        server = chroot.ChrootServer(
390
 
            transport.get_transport('memory:///foo/bar/'))
 
371
            transport.get_transport_from_url('memory:///foo/bar/'))
391
372
        self.start_server(server)
392
 
        t = transport.get_transport(server.get_url())
 
373
        t = transport.get_transport_from_url(server.get_url())
393
374
        # relpath from root and root path are the same
394
375
        relpath_cloned = t.clone('foo')
395
376
        abspath_cloned = t.clone('/foo')
404
385
        This is so that it is not possible to escape a chroot by doing::
405
386
            url = chroot_transport.base
406
387
            parent_url = urlutils.join(url, '..')
407
 
            new_t = transport.get_transport(parent_url)
 
388
            new_t = transport.get_transport_from_url(parent_url)
408
389
        """
409
390
        server = chroot.ChrootServer(
410
 
            transport.get_transport('memory:///path/subpath'))
 
391
            transport.get_transport_from_url('memory:///path/subpath'))
411
392
        self.start_server(server)
412
 
        t = transport.get_transport(server.get_url())
413
 
        new_t = transport.get_transport(t.base)
 
393
        t = transport.get_transport_from_url(server.get_url())
 
394
        new_t = transport.get_transport_from_url(t.base)
414
395
        self.assertEqual(t.server, new_t.server)
415
396
        self.assertEqual(t.base, new_t.base)
416
397
 
421
402
        This is so that it is not possible to escape a chroot by doing::
422
403
            url = chroot_transport.base
423
404
            parent_url = urlutils.join(url, '..')
424
 
            new_t = transport.get_transport(parent_url)
 
405
            new_t = transport.get_transport_from_url(parent_url)
425
406
        """
426
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
407
        server = chroot.ChrootServer(
 
408
            transport.get_transport_from_url('memory:///path/'))
427
409
        self.start_server(server)
428
 
        t = transport.get_transport(server.get_url())
 
410
        t = transport.get_transport_from_url(server.get_url())
429
411
        self.assertRaises(
430
412
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
431
413
 
441
423
        backing_transport = memory.MemoryTransport()
442
424
        server = chroot.ChrootServer(backing_transport)
443
425
        server.start_server()
444
 
        try:
445
 
            self.assertTrue(server.scheme
446
 
                            in transport._get_protocol_handlers().keys())
447
 
        finally:
448
 
            server.stop_server()
 
426
        self.addCleanup(server.stop_server)
 
427
        self.assertTrue(server.scheme
 
428
                        in transport._get_protocol_handlers().keys())
449
429
 
450
430
    def test_stop_server(self):
451
431
        backing_transport = memory.MemoryTransport()
459
439
        backing_transport = memory.MemoryTransport()
460
440
        server = chroot.ChrootServer(backing_transport)
461
441
        server.start_server()
462
 
        try:
463
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
464
 
        finally:
465
 
            server.stop_server()
 
442
        self.addCleanup(server.stop_server)
 
443
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
444
 
 
445
 
 
446
class TestHooks(tests.TestCase):
 
447
    """Basic tests for transport hooks"""
 
448
 
 
449
    def _get_connected_transport(self):
 
450
        return transport.ConnectedTransport("bogus:nowhere")
 
451
 
 
452
    def test_transporthooks_initialisation(self):
 
453
        """Check all expected transport hook points are set up"""
 
454
        hookpoint = transport.TransportHooks()
 
455
        self.assertTrue("post_connect" in hookpoint,
 
456
            "post_connect not in %s" % (hookpoint,))
 
457
 
 
458
    def test_post_connect(self):
 
459
        """Ensure the post_connect hook is called when _set_transport is"""
 
460
        calls = []
 
461
        transport.Transport.hooks.install_named_hook("post_connect",
 
462
            calls.append, None)
 
463
        t = self._get_connected_transport()
 
464
        self.assertLength(0, calls)
 
465
        t._set_connection("connection", "auth")
 
466
        self.assertEqual(calls, [t])
466
467
 
467
468
 
468
469
class PathFilteringDecoratorTransportTest(tests.TestCase):
471
472
    def test_abspath(self):
472
473
        # The abspath is always relative to the base of the backing transport.
473
474
        server = pathfilter.PathFilteringServer(
474
 
            transport.get_transport('memory:///foo/bar/'),
 
475
            transport.get_transport_from_url('memory:///foo/bar/'),
475
476
            lambda x: x)
476
477
        server.start_server()
477
 
        t = transport.get_transport(server.get_url())
 
478
        t = transport.get_transport_from_url(server.get_url())
478
479
        self.assertEqual(server.get_url(), t.abspath('/'))
479
480
 
480
481
        subdir_t = t.clone('subdir')
483
484
 
484
485
    def make_pf_transport(self, filter_func=None):
485
486
        """Make a PathFilteringTransport backed by a MemoryTransport.
486
 
        
 
487
 
487
488
        :param filter_func: by default this will be a no-op function.  Use this
488
489
            parameter to override it."""
489
490
        if filter_func is None:
490
491
            filter_func = lambda x: x
491
492
        server = pathfilter.PathFilteringServer(
492
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
493
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
493
494
        server.start_server()
494
495
        self.addCleanup(server.stop_server)
495
 
        return transport.get_transport(server.get_url())
 
496
        return transport.get_transport_from_url(server.get_url())
496
497
 
497
498
    def test__filter(self):
498
499
        # _filter (with an identity func as filter_func) always returns
511
512
 
512
513
    def test_filter_invocation(self):
513
514
        filter_log = []
 
515
 
514
516
        def filter(path):
515
517
            filter_log.append(path)
516
518
            return path
541
543
        otherwise) the filtering by doing::
542
544
            url = filtered_transport.base
543
545
            parent_url = urlutils.join(url, '..')
544
 
            new_t = transport.get_transport(parent_url)
 
546
            new_t = transport.get_transport_from_url(parent_url)
545
547
        """
546
548
        t = self.make_pf_transport()
547
 
        new_t = transport.get_transport(t.base)
 
549
        new_t = transport.get_transport_from_url(t.base)
548
550
        self.assertEqual(t.server, new_t.server)
549
551
        self.assertEqual(t.base, new_t.base)
550
552
 
563
565
        # connect to '.' via http which is not listable
564
566
        server = HttpServer()
565
567
        self.start_server(server)
566
 
        t = transport.get_transport('readonly+' + server.get_url())
 
568
        t = transport.get_transport_from_url('readonly+' + server.get_url())
567
569
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
568
570
        self.assertEqual(False, t.listable())
569
571
        self.assertEqual(True, t.is_readonly())
602
604
        # the url should be decorated appropriately
603
605
        self.assertStartsWith(server.get_url(), 'fakenfs+')
604
606
        # and we should be able to get a transport for it
605
 
        t = transport.get_transport(server.get_url())
 
607
        t = transport.get_transport_from_url(server.get_url())
606
608
        # which must be a FakeNFSTransportDecorator instance.
607
609
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
608
610
 
685
687
        base_url = self._server.get_url()
686
688
        url = self._adjust_url(base_url, relpath)
687
689
        # try getting the transport via the regular interface:
688
 
        t = transport.get_transport(url)
 
690
        t = transport.get_transport_from_url(url)
689
691
        # vila--20070607 if the following are commented out the test suite
690
692
        # still pass. Is this really still needed or was it a forgotten
691
693
        # temporary fix ?
696
698
        return t
697
699
 
698
700
 
 
701
class TestTransportFromPath(tests.TestCaseInTempDir):
 
702
 
 
703
    def test_with_path(self):
 
704
        t = transport.get_transport_from_path(self.test_dir)
 
705
        self.assertIsInstance(t, local.LocalTransport)
 
706
        self.assertEquals(t.base.rstrip("/"),
 
707
            urlutils.local_path_to_url(self.test_dir))
 
708
 
 
709
    def test_with_url(self):
 
710
        t = transport.get_transport_from_path("file:")
 
711
        self.assertIsInstance(t, local.LocalTransport)
 
712
        self.assertEquals(t.base.rstrip("/"),
 
713
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
714
 
 
715
 
 
716
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
717
 
 
718
    def test_with_path(self):
 
719
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
720
            self.test_dir)
 
721
 
 
722
    def test_with_url(self):
 
723
        url = urlutils.local_path_to_url(self.test_dir)
 
724
        t = transport.get_transport_from_url(url)
 
725
        self.assertIsInstance(t, local.LocalTransport)
 
726
        self.assertEquals(t.base.rstrip("/"), url)
 
727
 
 
728
    def test_with_url_and_segment_parameters(self):
 
729
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
 
730
        t = transport.get_transport_from_url(url)
 
731
        self.assertIsInstance(t, local.LocalTransport)
 
732
        self.assertEquals(t.base.rstrip("/"), url)
 
733
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
 
734
            f.write("data")
 
735
        self.assertTrue(t.has("afile"))
 
736
 
 
737
 
699
738
class TestLocalTransports(tests.TestCase):
700
739
 
701
740
    def test_get_transport_from_abspath(self):
723
762
        self.assertEquals(t.local_abspath(''), here)
724
763
 
725
764
 
 
765
class TestLocalTransportMutation(tests.TestCaseInTempDir):
 
766
 
 
767
    def test_local_transport_mkdir(self):
 
768
        here = osutils.abspath('.')
 
769
        t = transport.get_transport(here)
 
770
        t.mkdir('test')
 
771
        self.assertTrue(os.path.exists('test'))
 
772
 
 
773
    def test_local_transport_mkdir_permission_denied(self):
 
774
        # See https://bugs.launchpad.net/bzr/+bug/606537
 
775
        here = osutils.abspath('.')
 
776
        t = transport.get_transport(here)
 
777
        def fake_chmod(path, mode):
 
778
            e = OSError('permission denied')
 
779
            e.errno = errno.EPERM
 
780
            raise e
 
781
        self.overrideAttr(os, 'chmod', fake_chmod)
 
782
        t.mkdir('test')
 
783
        t.mkdir('test2', mode=0707)
 
784
        self.assertTrue(os.path.exists('test'))
 
785
        self.assertTrue(os.path.exists('test2'))
 
786
 
 
787
 
726
788
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
727
789
 
728
790
    def test_local_fdatasync_calls_fdatasync(self):
731
793
        We can't easily observe the external effect but we can at least see
732
794
        it's called.
733
795
        """
 
796
        sentinel = object()
 
797
        fdatasync = getattr(os, 'fdatasync', sentinel)
 
798
        if fdatasync is sentinel:
 
799
            raise tests.TestNotApplicable('fdatasync not supported')
734
800
        t = self.get_transport('.')
735
801
        calls = self.recordCalls(os, 'fdatasync')
736
802
        w = t.open_write_stream('out')
741
807
            self.assertEquals(f.read(), 'foo')
742
808
        self.assertEquals(len(calls), 1, calls)
743
809
 
 
810
    def test_missing_directory(self):
 
811
        t = self.get_transport('.')
 
812
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
 
813
 
744
814
 
745
815
class TestWin32LocalTransport(tests.TestCase):
746
816
 
763
833
    def test_parse_url(self):
764
834
        t = transport.ConnectedTransport(
765
835
            'http://simple.example.com/home/source')
766
 
        self.assertEquals(t._host, 'simple.example.com')
767
 
        self.assertEquals(t._port, None)
768
 
        self.assertEquals(t._path, '/home/source/')
769
 
        self.assertTrue(t._user is None)
770
 
        self.assertTrue(t._password is None)
 
836
        self.assertEquals(t._parsed_url.host, 'simple.example.com')
 
837
        self.assertEquals(t._parsed_url.port, None)
 
838
        self.assertEquals(t._parsed_url.path, '/home/source/')
 
839
        self.assertTrue(t._parsed_url.user is None)
 
840
        self.assertTrue(t._parsed_url.password is None)
771
841
 
772
842
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
773
843
 
774
844
    def test_parse_url_with_at_in_user(self):
775
845
        # Bug 228058
776
846
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
777
 
        self.assertEquals(t._user, 'user@host.com')
 
847
        self.assertEquals(t._parsed_url.user, 'user@host.com')
778
848
 
779
849
    def test_parse_quoted_url(self):
780
850
        t = transport.ConnectedTransport(
781
851
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
782
 
        self.assertEquals(t._host, 'exAmple.com')
783
 
        self.assertEquals(t._port, 2222)
784
 
        self.assertEquals(t._user, 'robey')
785
 
        self.assertEquals(t._password, 'h@t')
786
 
        self.assertEquals(t._path, '/path/')
 
852
        self.assertEquals(t._parsed_url.host, 'exAmple.com')
 
853
        self.assertEquals(t._parsed_url.port, 2222)
 
854
        self.assertEquals(t._parsed_url.user, 'robey')
 
855
        self.assertEquals(t._parsed_url.password, 'h@t')
 
856
        self.assertEquals(t._parsed_url.path, '/path/')
787
857
 
788
858
        # Base should not keep track of the password
789
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
859
        self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
790
860
 
791
861
    def test_parse_invalid_url(self):
792
862
        self.assertRaises(errors.InvalidURL,
796
866
    def test_relpath(self):
797
867
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
798
868
 
799
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
869
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
 
870
            'sub')
800
871
        self.assertRaises(errors.PathNotChild, t.relpath,
801
872
                          'http://user@host.com/abs/path/sub')
802
873
        self.assertRaises(errors.PathNotChild, t.relpath,
815
886
 
816
887
    def test_connection_sharing_propagate_credentials(self):
817
888
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
818
 
        self.assertEquals('user', t._user)
819
 
        self.assertEquals('host.com', t._host)
 
889
        self.assertEquals('user', t._parsed_url.user)
 
890
        self.assertEquals('host.com', t._parsed_url.host)
820
891
        self.assertIs(None, t._get_connection())
821
 
        self.assertIs(None, t._password)
 
892
        self.assertIs(None, t._parsed_url.password)
822
893
        c = t.clone('subdir')
823
894
        self.assertIs(None, c._get_connection())
824
 
        self.assertIs(None, t._password)
 
895
        self.assertIs(None, t._parsed_url.password)
825
896
 
826
897
        # Simulate the user entering a password
827
898
        password = 'secret'
846
917
 
847
918
    def test_reuse_same_transport(self):
848
919
        possible_transports = []
849
 
        t1 = transport.get_transport('http://foo/',
 
920
        t1 = transport.get_transport_from_url('http://foo/',
850
921
                                     possible_transports=possible_transports)
851
922
        self.assertEqual([t1], possible_transports)
852
 
        t2 = transport.get_transport('http://foo/',
 
923
        t2 = transport.get_transport_from_url('http://foo/',
853
924
                                     possible_transports=[t1])
854
925
        self.assertIs(t1, t2)
855
926
 
856
927
        # Also check that final '/' are handled correctly
857
 
        t3 = transport.get_transport('http://foo/path/')
858
 
        t4 = transport.get_transport('http://foo/path',
 
928
        t3 = transport.get_transport_from_url('http://foo/path/')
 
929
        t4 = transport.get_transport_from_url('http://foo/path',
859
930
                                     possible_transports=[t3])
860
931
        self.assertIs(t3, t4)
861
932
 
862
 
        t5 = transport.get_transport('http://foo/path')
863
 
        t6 = transport.get_transport('http://foo/path/',
 
933
        t5 = transport.get_transport_from_url('http://foo/path')
 
934
        t6 = transport.get_transport_from_url('http://foo/path/',
864
935
                                     possible_transports=[t5])
865
936
        self.assertIs(t5, t6)
866
937
 
867
938
    def test_don_t_reuse_different_transport(self):
868
 
        t1 = transport.get_transport('http://foo/path')
869
 
        t2 = transport.get_transport('http://bar/path',
 
939
        t1 = transport.get_transport_from_url('http://foo/path')
 
940
        t2 = transport.get_transport_from_url('http://bar/path',
870
941
                                     possible_transports=[t1])
871
942
        self.assertIsNot(t1, t2)
872
943
 
873
944
 
874
945
class TestTransportTrace(tests.TestCase):
875
946
 
876
 
    def test_get(self):
877
 
        t = transport.get_transport('trace+memory://')
878
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
947
    def test_decorator(self):
 
948
        t = transport.get_transport_from_url('trace+memory://')
 
949
        self.assertIsInstance(
 
950
            t, bzrlib.transport.trace.TransportTraceDecorator)
879
951
 
880
952
    def test_clone_preserves_activity(self):
881
 
        t = transport.get_transport('trace+memory://')
 
953
        t = transport.get_transport_from_url('trace+memory://')
882
954
        t2 = t.clone('.')
883
955
        self.assertTrue(t is not t2)
884
956
        self.assertTrue(t._activity is t2._activity)
888
960
    # still won't cause a test failure when the top level Transport API
889
961
    # changes; so there is little return doing that.
890
962
    def test_get(self):
891
 
        t = transport.get_transport('trace+memory:///')
 
963
        t = transport.get_transport_from_url('trace+memory:///')
892
964
        t.put_bytes('foo', 'barish')
893
965
        t.get('foo')
894
966
        expected_result = []
900
972
        self.assertEqual(expected_result, t._activity)
901
973
 
902
974
    def test_readv(self):
903
 
        t = transport.get_transport('trace+memory:///')
 
975
        t = transport.get_transport_from_url('trace+memory:///')
904
976
        t.put_bytes('foo', 'barish')
905
977
        list(t.readv('foo', [(0, 1), (3, 2)],
906
978
                     adjust_for_latency=True, upper_limit=6))
916
988
class TestSSHConnections(tests.TestCaseWithTransport):
917
989
 
918
990
    def test_bzr_connect_to_bzr_ssh(self):
919
 
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
991
        """get_transport of a bzr+ssh:// behaves correctly.
920
992
 
921
993
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
922
994
        """
938
1010
        # SSH channel ourselves.  Surely this has already been implemented
939
1011
        # elsewhere?
940
1012
        started = []
 
1013
 
941
1014
        class StubSSHServer(stub_sftp.StubServer):
942
1015
 
943
1016
            test = self
949
1022
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
950
1023
 
951
1024
                # XXX: horribly inefficient, not to mention ugly.
952
 
                # Start a thread for each of stdin/out/err, and relay bytes from
953
 
                # the subprocess to channel and vice versa.
 
1025
                # Start a thread for each of stdin/out/err, and relay bytes
 
1026
                # from the subprocess to channel and vice versa.
954
1027
                def ferry_bytes(read, write, close):
955
1028
                    while True:
956
1029
                        bytes = read(1)
1024
1097
        result = http.unhtml_roughly(fake_html)
1025
1098
        self.assertEquals(len(result), 1000)
1026
1099
        self.assertStartsWith(result, " something!")
 
1100
 
 
1101
 
 
1102
class SomeDirectory(object):
 
1103
 
 
1104
    def look_up(self, name, url):
 
1105
        return "http://bar"
 
1106
 
 
1107
 
 
1108
class TestLocationToUrl(tests.TestCase):
 
1109
 
 
1110
    def get_base_location(self):
 
1111
        path = osutils.abspath('/foo/bar')
 
1112
        if path.startswith('/'):
 
1113
            url = 'file://%s' % (path,)
 
1114
        else:
 
1115
            # On Windows, abspaths start with the drive letter, so we have to
 
1116
            # add in the extra '/'
 
1117
            url = 'file:///%s' % (path,)
 
1118
        return path, url
 
1119
 
 
1120
    def test_regular_url(self):
 
1121
        self.assertEquals("file://foo", location_to_url("file://foo"))
 
1122
 
 
1123
    def test_directory(self):
 
1124
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1125
        self.addCleanup(directories.remove, "bar:")
 
1126
        self.assertEquals("http://bar", location_to_url("bar:"))
 
1127
 
 
1128
    def test_unicode_url(self):
 
1129
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1130
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1131
 
 
1132
    def test_unicode_path(self):
 
1133
        path, url = self.get_base_location()
 
1134
        location = path + "\xc3\xaf".decode("utf-8")
 
1135
        url += '%C3%AF'
 
1136
        self.assertEquals(url, location_to_url(location))
 
1137
 
 
1138
    def test_path(self):
 
1139
        path, url = self.get_base_location()
 
1140
        self.assertEquals(url, location_to_url(path))
 
1141
 
 
1142
    def test_relative_file_url(self):
 
1143
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
 
1144
            location_to_url("file:bar"))
 
1145
 
 
1146
    def test_absolute_file_url(self):
 
1147
        self.assertEquals("file:///bar", location_to_url("file:/bar"))