~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Richard Wilbur
  • Date: 2016-02-04 19:07:28 UTC
  • mto: This revision was merged to the branch mainline in revision 6618.
  • Revision ID: richard.wilbur@gmail.com-20160204190728-p0zvfii6zase0fw7
Update COPYING.txt from the original http://www.gnu.org/licenses/gpl-2.0.txt  (Only differences were in whitespace.)  Thanks to Petr Stodulka for pointing out the discrepancy.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2015 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
 
18
18
from cStringIO import StringIO
 
19
import errno
 
20
import os
19
21
import subprocess
20
22
import sys
21
23
import threading
27
29
    transport,
28
30
    urlutils,
29
31
    )
 
32
from bzrlib.directory_service import directories
30
33
from bzrlib.transport import (
31
34
    chroot,
32
35
    fakenfs,
33
36
    http,
34
37
    local,
 
38
    location_to_url,
35
39
    memory,
36
40
    pathfilter,
37
41
    readonly,
38
42
    )
 
43
import bzrlib.transport.trace
39
44
from bzrlib.tests import (
40
45
    features,
41
46
    test_server,
48
53
class TestTransport(tests.TestCase):
49
54
    """Test the non transport-concrete class functionality."""
50
55
 
51
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
52
 
    # of try/finally -- vila 20100205
53
 
 
54
56
    def test__get_set_protocol_handlers(self):
55
57
        handlers = transport._get_protocol_handlers()
56
 
        self.assertNotEqual([], handlers.keys( ))
57
 
        try:
58
 
            transport._clear_protocol_handlers()
59
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
60
 
        finally:
61
 
            transport._set_protocol_handlers(handlers)
 
58
        self.assertNotEqual([], handlers.keys())
 
59
        transport._clear_protocol_handlers()
 
60
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
61
        self.assertEqual([], transport._get_protocol_handlers().keys())
62
62
 
63
63
    def test_get_transport_modules(self):
64
64
        handlers = transport._get_protocol_handlers()
 
65
        self.addCleanup(transport._set_protocol_handlers, handlers)
65
66
        # don't pollute the current handlers
66
67
        transport._clear_protocol_handlers()
 
68
 
67
69
        class SampleHandler(object):
68
70
            """I exist, isnt that enough?"""
69
 
        try:
70
 
            transport._clear_protocol_handlers()
71
 
            transport.register_transport_proto('foo')
72
 
            transport.register_lazy_transport('foo',
73
 
                                              'bzrlib.tests.test_transport',
74
 
                                              'TestTransport.SampleHandler')
75
 
            transport.register_transport_proto('bar')
76
 
            transport.register_lazy_transport('bar',
77
 
                                              'bzrlib.tests.test_transport',
78
 
                                              'TestTransport.SampleHandler')
79
 
            self.assertEqual([SampleHandler.__module__,
80
 
                              'bzrlib.transport.chroot',
81
 
                              'bzrlib.transport.pathfilter'],
82
 
                             transport._get_transport_modules())
83
 
        finally:
84
 
            transport._set_protocol_handlers(handlers)
 
71
        transport._clear_protocol_handlers()
 
72
        transport.register_transport_proto('foo')
 
73
        transport.register_lazy_transport('foo',
 
74
                                            'bzrlib.tests.test_transport',
 
75
                                            'TestTransport.SampleHandler')
 
76
        transport.register_transport_proto('bar')
 
77
        transport.register_lazy_transport('bar',
 
78
                                            'bzrlib.tests.test_transport',
 
79
                                            'TestTransport.SampleHandler')
 
80
        self.assertEqual([SampleHandler.__module__,
 
81
                            'bzrlib.transport.chroot',
 
82
                            'bzrlib.transport.pathfilter'],
 
83
                            transport._get_transport_modules())
85
84
 
86
85
    def test_transport_dependency(self):
87
86
        """Transport with missing dependency causes no error"""
88
87
        saved_handlers = transport._get_protocol_handlers()
 
88
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
89
89
        # don't pollute the current handlers
90
90
        transport._clear_protocol_handlers()
 
91
        transport.register_transport_proto('foo')
 
92
        transport.register_lazy_transport(
 
93
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
91
94
        try:
92
 
            transport.register_transport_proto('foo')
93
 
            transport.register_lazy_transport(
94
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
95
 
            try:
96
 
                transport.get_transport('foo://fooserver/foo')
97
 
            except errors.UnsupportedProtocol, e:
98
 
                e_str = str(e)
99
 
                self.assertEquals('Unsupported protocol'
100
 
                                  ' for url "foo://fooserver/foo":'
101
 
                                  ' Unable to import library "some_lib":'
102
 
                                  ' testing missing dependency', str(e))
103
 
            else:
104
 
                self.fail('Did not raise UnsupportedProtocol')
105
 
        finally:
106
 
            # restore original values
107
 
            transport._set_protocol_handlers(saved_handlers)
 
95
            transport.get_transport_from_url('foo://fooserver/foo')
 
96
        except errors.UnsupportedProtocol, e:
 
97
            e_str = str(e)
 
98
            self.assertEquals('Unsupported protocol'
 
99
                                ' for url "foo://fooserver/foo":'
 
100
                                ' Unable to import library "some_lib":'
 
101
                                ' testing missing dependency', str(e))
 
102
        else:
 
103
            self.fail('Did not raise UnsupportedProtocol')
108
104
 
109
105
    def test_transport_fallback(self):
110
106
        """Transport with missing dependency causes no error"""
111
107
        saved_handlers = transport._get_protocol_handlers()
112
 
        try:
113
 
            transport._clear_protocol_handlers()
114
 
            transport.register_transport_proto('foo')
115
 
            transport.register_lazy_transport(
116
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
117
 
            transport.register_lazy_transport(
118
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
119
 
            t = transport.get_transport('foo://fooserver/foo')
120
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
121
 
        finally:
122
 
            transport._set_protocol_handlers(saved_handlers)
 
108
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
109
        transport._clear_protocol_handlers()
 
110
        transport.register_transport_proto('foo')
 
111
        transport.register_lazy_transport(
 
112
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
113
        transport.register_lazy_transport(
 
114
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
115
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
116
        self.assertTrue(isinstance(t, BackupTransportHandler))
123
117
 
124
118
    def test_ssh_hints(self):
125
119
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
126
120
        try:
127
 
            transport.get_transport('ssh://fooserver/foo')
 
121
            transport.get_transport_from_url('ssh://fooserver/foo')
128
122
        except errors.UnsupportedProtocol, e:
129
123
            e_str = str(e)
130
124
            self.assertEquals('Unsupported protocol'
145
139
        self.assertRaises(errors.ReadError, a_file.read, 40)
146
140
        a_file.close()
147
141
 
148
 
    def test__combine_paths(self):
149
 
        t = transport.Transport('/')
150
 
        self.assertEqual('/home/sarah/project/foo',
151
 
                         t._combine_paths('/home/sarah', 'project/foo'))
152
 
        self.assertEqual('/etc',
153
 
                         t._combine_paths('/home/sarah', '../../etc'))
154
 
        self.assertEqual('/etc',
155
 
                         t._combine_paths('/home/sarah', '../../../etc'))
156
 
        self.assertEqual('/etc',
157
 
                         t._combine_paths('/home/sarah', '/etc'))
158
 
 
159
142
    def test_local_abspath_non_local_transport(self):
160
143
        # the base implementation should throw
161
144
        t = memory.MemoryTransport()
218
201
 
219
202
    def test_coalesce_fudge(self):
220
203
        self.check([(10, 30, [(0, 10), (20, 10)]),
221
 
                    (100, 10, [(0, 10),]),
 
204
                    (100, 10, [(0, 10)]),
222
205
                   ], [(10, 10), (30, 10), (100, 10)],
223
 
                   fudge=10
224
 
                  )
 
206
                   fudge=10)
 
207
 
225
208
    def test_coalesce_max_size(self):
226
209
        self.check([(10, 20, [(0, 10), (10, 10)]),
227
210
                    (30, 50, [(0, 50)]),
228
211
                    # If one range is above max_size, it gets its own coalesced
229
212
                    # offset
230
 
                    (100, 80, [(0, 80),]),],
 
213
                    (100, 80, [(0, 80)]),],
231
214
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
232
 
                   max_size=50
233
 
                  )
 
215
                   max_size=50)
234
216
 
235
217
    def test_coalesce_no_max_size(self):
236
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
218
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
237
219
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
238
220
                  )
239
221
 
240
222
    def test_coalesce_default_limit(self):
241
223
        # By default we use a 100MB max size.
242
 
        ten_mb = 10*1024*1024
243
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
224
        ten_mb = 10 * 1024 * 1024
 
225
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
244
226
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
245
227
                   [(i*ten_mb, ten_mb) for i in range(11)])
246
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
247
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
228
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
229
                   [(i * ten_mb, ten_mb) for i in range(11)],
248
230
                   max_size=1*1024*1024*1024)
249
231
 
250
232
 
255
237
        server.start_server()
256
238
        url = server.get_url()
257
239
        self.assertTrue(url in transport.transport_list_registry)
258
 
        t = transport.get_transport(url)
 
240
        t = transport.get_transport_from_url(url)
259
241
        del t
260
242
        server.stop_server()
261
243
        self.assertFalse(url in transport.transport_list_registry)
376
358
    def test_abspath(self):
377
359
        # The abspath is always relative to the chroot_url.
378
360
        server = chroot.ChrootServer(
379
 
            transport.get_transport('memory:///foo/bar/'))
 
361
            transport.get_transport_from_url('memory:///foo/bar/'))
380
362
        self.start_server(server)
381
 
        t = transport.get_transport(server.get_url())
 
363
        t = transport.get_transport_from_url(server.get_url())
382
364
        self.assertEqual(server.get_url(), t.abspath('/'))
383
365
 
384
366
        subdir_t = t.clone('subdir')
386
368
 
387
369
    def test_clone(self):
388
370
        server = chroot.ChrootServer(
389
 
            transport.get_transport('memory:///foo/bar/'))
 
371
            transport.get_transport_from_url('memory:///foo/bar/'))
390
372
        self.start_server(server)
391
 
        t = transport.get_transport(server.get_url())
 
373
        t = transport.get_transport_from_url(server.get_url())
392
374
        # relpath from root and root path are the same
393
375
        relpath_cloned = t.clone('foo')
394
376
        abspath_cloned = t.clone('/foo')
403
385
        This is so that it is not possible to escape a chroot by doing::
404
386
            url = chroot_transport.base
405
387
            parent_url = urlutils.join(url, '..')
406
 
            new_t = transport.get_transport(parent_url)
 
388
            new_t = transport.get_transport_from_url(parent_url)
407
389
        """
408
390
        server = chroot.ChrootServer(
409
 
            transport.get_transport('memory:///path/subpath'))
 
391
            transport.get_transport_from_url('memory:///path/subpath'))
410
392
        self.start_server(server)
411
 
        t = transport.get_transport(server.get_url())
412
 
        new_t = transport.get_transport(t.base)
 
393
        t = transport.get_transport_from_url(server.get_url())
 
394
        new_t = transport.get_transport_from_url(t.base)
413
395
        self.assertEqual(t.server, new_t.server)
414
396
        self.assertEqual(t.base, new_t.base)
415
397
 
420
402
        This is so that it is not possible to escape a chroot by doing::
421
403
            url = chroot_transport.base
422
404
            parent_url = urlutils.join(url, '..')
423
 
            new_t = transport.get_transport(parent_url)
 
405
            new_t = transport.get_transport_from_url(parent_url)
424
406
        """
425
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
407
        server = chroot.ChrootServer(
 
408
            transport.get_transport_from_url('memory:///path/'))
426
409
        self.start_server(server)
427
 
        t = transport.get_transport(server.get_url())
 
410
        t = transport.get_transport_from_url(server.get_url())
428
411
        self.assertRaises(
429
412
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
430
413
 
440
423
        backing_transport = memory.MemoryTransport()
441
424
        server = chroot.ChrootServer(backing_transport)
442
425
        server.start_server()
443
 
        try:
444
 
            self.assertTrue(server.scheme
445
 
                            in transport._get_protocol_handlers().keys())
446
 
        finally:
447
 
            server.stop_server()
 
426
        self.addCleanup(server.stop_server)
 
427
        self.assertTrue(server.scheme
 
428
                        in transport._get_protocol_handlers().keys())
448
429
 
449
430
    def test_stop_server(self):
450
431
        backing_transport = memory.MemoryTransport()
458
439
        backing_transport = memory.MemoryTransport()
459
440
        server = chroot.ChrootServer(backing_transport)
460
441
        server.start_server()
461
 
        try:
462
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
463
 
        finally:
464
 
            server.stop_server()
 
442
        self.addCleanup(server.stop_server)
 
443
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
444
 
 
445
 
 
446
class TestHooks(tests.TestCase):
 
447
    """Basic tests for transport hooks"""
 
448
 
 
449
    def _get_connected_transport(self):
 
450
        return transport.ConnectedTransport("bogus:nowhere")
 
451
 
 
452
    def test_transporthooks_initialisation(self):
 
453
        """Check all expected transport hook points are set up"""
 
454
        hookpoint = transport.TransportHooks()
 
455
        self.assertTrue("post_connect" in hookpoint,
 
456
            "post_connect not in %s" % (hookpoint,))
 
457
 
 
458
    def test_post_connect(self):
 
459
        """Ensure the post_connect hook is called when _set_transport is"""
 
460
        calls = []
 
461
        transport.Transport.hooks.install_named_hook("post_connect",
 
462
            calls.append, None)
 
463
        t = self._get_connected_transport()
 
464
        self.assertLength(0, calls)
 
465
        t._set_connection("connection", "auth")
 
466
        self.assertEqual(calls, [t])
465
467
 
466
468
 
467
469
class PathFilteringDecoratorTransportTest(tests.TestCase):
470
472
    def test_abspath(self):
471
473
        # The abspath is always relative to the base of the backing transport.
472
474
        server = pathfilter.PathFilteringServer(
473
 
            transport.get_transport('memory:///foo/bar/'),
 
475
            transport.get_transport_from_url('memory:///foo/bar/'),
474
476
            lambda x: x)
475
477
        server.start_server()
476
 
        t = transport.get_transport(server.get_url())
 
478
        t = transport.get_transport_from_url(server.get_url())
477
479
        self.assertEqual(server.get_url(), t.abspath('/'))
478
480
 
479
481
        subdir_t = t.clone('subdir')
482
484
 
483
485
    def make_pf_transport(self, filter_func=None):
484
486
        """Make a PathFilteringTransport backed by a MemoryTransport.
485
 
        
 
487
 
486
488
        :param filter_func: by default this will be a no-op function.  Use this
487
489
            parameter to override it."""
488
490
        if filter_func is None:
489
491
            filter_func = lambda x: x
490
492
        server = pathfilter.PathFilteringServer(
491
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
493
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
492
494
        server.start_server()
493
495
        self.addCleanup(server.stop_server)
494
 
        return transport.get_transport(server.get_url())
 
496
        return transport.get_transport_from_url(server.get_url())
495
497
 
496
498
    def test__filter(self):
497
499
        # _filter (with an identity func as filter_func) always returns
510
512
 
511
513
    def test_filter_invocation(self):
512
514
        filter_log = []
 
515
 
513
516
        def filter(path):
514
517
            filter_log.append(path)
515
518
            return path
540
543
        otherwise) the filtering by doing::
541
544
            url = filtered_transport.base
542
545
            parent_url = urlutils.join(url, '..')
543
 
            new_t = transport.get_transport(parent_url)
 
546
            new_t = transport.get_transport_from_url(parent_url)
544
547
        """
545
548
        t = self.make_pf_transport()
546
 
        new_t = transport.get_transport(t.base)
 
549
        new_t = transport.get_transport_from_url(t.base)
547
550
        self.assertEqual(t.server, new_t.server)
548
551
        self.assertEqual(t.base, new_t.base)
549
552
 
562
565
        # connect to '.' via http which is not listable
563
566
        server = HttpServer()
564
567
        self.start_server(server)
565
 
        t = transport.get_transport('readonly+' + server.get_url())
 
568
        t = transport.get_transport_from_url('readonly+' + server.get_url())
566
569
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
567
570
        self.assertEqual(False, t.listable())
568
571
        self.assertEqual(True, t.is_readonly())
601
604
        # the url should be decorated appropriately
602
605
        self.assertStartsWith(server.get_url(), 'fakenfs+')
603
606
        # and we should be able to get a transport for it
604
 
        t = transport.get_transport(server.get_url())
 
607
        t = transport.get_transport_from_url(server.get_url())
605
608
        # which must be a FakeNFSTransportDecorator instance.
606
609
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
607
610
 
684
687
        base_url = self._server.get_url()
685
688
        url = self._adjust_url(base_url, relpath)
686
689
        # try getting the transport via the regular interface:
687
 
        t = transport.get_transport(url)
 
690
        t = transport.get_transport_from_url(url)
688
691
        # vila--20070607 if the following are commented out the test suite
689
692
        # still pass. Is this really still needed or was it a forgotten
690
693
        # temporary fix ?
695
698
        return t
696
699
 
697
700
 
 
701
class TestTransportFromPath(tests.TestCaseInTempDir):
 
702
 
 
703
    def test_with_path(self):
 
704
        t = transport.get_transport_from_path(self.test_dir)
 
705
        self.assertIsInstance(t, local.LocalTransport)
 
706
        self.assertEquals(t.base.rstrip("/"),
 
707
            urlutils.local_path_to_url(self.test_dir))
 
708
 
 
709
    def test_with_url(self):
 
710
        t = transport.get_transport_from_path("file:")
 
711
        self.assertIsInstance(t, local.LocalTransport)
 
712
        self.assertEquals(t.base.rstrip("/"),
 
713
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
714
 
 
715
 
 
716
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
717
 
 
718
    def test_with_path(self):
 
719
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
720
            self.test_dir)
 
721
 
 
722
    def test_with_url(self):
 
723
        url = urlutils.local_path_to_url(self.test_dir)
 
724
        t = transport.get_transport_from_url(url)
 
725
        self.assertIsInstance(t, local.LocalTransport)
 
726
        self.assertEquals(t.base.rstrip("/"), url)
 
727
 
 
728
    def test_with_url_and_segment_parameters(self):
 
729
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
 
730
        t = transport.get_transport_from_url(url)
 
731
        self.assertIsInstance(t, local.LocalTransport)
 
732
        self.assertEquals(t.base.rstrip("/"), url)
 
733
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
 
734
            f.write("data")
 
735
        self.assertTrue(t.has("afile"))
 
736
 
 
737
 
698
738
class TestLocalTransports(tests.TestCase):
699
739
 
700
740
    def test_get_transport_from_abspath(self):
722
762
        self.assertEquals(t.local_abspath(''), here)
723
763
 
724
764
 
 
765
class TestLocalTransportMutation(tests.TestCaseInTempDir):
 
766
 
 
767
    def test_local_transport_mkdir(self):
 
768
        here = osutils.abspath('.')
 
769
        t = transport.get_transport(here)
 
770
        t.mkdir('test')
 
771
        self.assertTrue(os.path.exists('test'))
 
772
 
 
773
    def test_local_transport_mkdir_permission_denied(self):
 
774
        # See https://bugs.launchpad.net/bzr/+bug/606537
 
775
        here = osutils.abspath('.')
 
776
        t = transport.get_transport(here)
 
777
        def fake_chmod(path, mode):
 
778
            e = OSError('permission denied')
 
779
            e.errno = errno.EPERM
 
780
            raise e
 
781
        self.overrideAttr(os, 'chmod', fake_chmod)
 
782
        t.mkdir('test')
 
783
        t.mkdir('test2', mode=0707)
 
784
        self.assertTrue(os.path.exists('test'))
 
785
        self.assertTrue(os.path.exists('test2'))
 
786
 
 
787
 
 
788
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
 
789
 
 
790
    def test_local_fdatasync_calls_fdatasync(self):
 
791
        """Check fdatasync on a stream tries to flush the data to the OS.
 
792
        
 
793
        We can't easily observe the external effect but we can at least see
 
794
        it's called.
 
795
        """
 
796
        sentinel = object()
 
797
        fdatasync = getattr(os, 'fdatasync', sentinel)
 
798
        if fdatasync is sentinel:
 
799
            raise tests.TestNotApplicable('fdatasync not supported')
 
800
        t = self.get_transport('.')
 
801
        calls = self.recordCalls(os, 'fdatasync')
 
802
        w = t.open_write_stream('out')
 
803
        w.write('foo')
 
804
        w.fdatasync()
 
805
        with open('out', 'rb') as f:
 
806
            # Should have been flushed.
 
807
            self.assertEquals(f.read(), 'foo')
 
808
        self.assertEquals(len(calls), 1, calls)
 
809
 
 
810
    def test_missing_directory(self):
 
811
        t = self.get_transport('.')
 
812
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
 
813
 
 
814
 
725
815
class TestWin32LocalTransport(tests.TestCase):
726
816
 
727
817
    def test_unc_clone_to_root(self):
 
818
        self.requireFeature(features.win32_feature)
728
819
        # Win32 UNC path like \\HOST\path
729
820
        # clone to root should stop at least at \\HOST part
730
821
        # not on \\
743
834
    def test_parse_url(self):
744
835
        t = transport.ConnectedTransport(
745
836
            'http://simple.example.com/home/source')
746
 
        self.assertEquals(t._host, 'simple.example.com')
747
 
        self.assertEquals(t._port, None)
748
 
        self.assertEquals(t._path, '/home/source/')
749
 
        self.assertTrue(t._user is None)
750
 
        self.assertTrue(t._password is None)
 
837
        self.assertEquals(t._parsed_url.host, 'simple.example.com')
 
838
        self.assertEquals(t._parsed_url.port, None)
 
839
        self.assertEquals(t._parsed_url.path, '/home/source/')
 
840
        self.assertTrue(t._parsed_url.user is None)
 
841
        self.assertTrue(t._parsed_url.password is None)
751
842
 
752
843
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
753
844
 
754
845
    def test_parse_url_with_at_in_user(self):
755
846
        # Bug 228058
756
847
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
757
 
        self.assertEquals(t._user, 'user@host.com')
 
848
        self.assertEquals(t._parsed_url.user, 'user@host.com')
758
849
 
759
850
    def test_parse_quoted_url(self):
760
851
        t = transport.ConnectedTransport(
761
852
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
762
 
        self.assertEquals(t._host, 'exAmple.com')
763
 
        self.assertEquals(t._port, 2222)
764
 
        self.assertEquals(t._user, 'robey')
765
 
        self.assertEquals(t._password, 'h@t')
766
 
        self.assertEquals(t._path, '/path/')
 
853
        self.assertEquals(t._parsed_url.host, 'exAmple.com')
 
854
        self.assertEquals(t._parsed_url.port, 2222)
 
855
        self.assertEquals(t._parsed_url.user, 'robey')
 
856
        self.assertEquals(t._parsed_url.password, 'h@t')
 
857
        self.assertEquals(t._parsed_url.path, '/path/')
767
858
 
768
859
        # Base should not keep track of the password
769
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
860
        self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
770
861
 
771
862
    def test_parse_invalid_url(self):
772
863
        self.assertRaises(errors.InvalidURL,
776
867
    def test_relpath(self):
777
868
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
778
869
 
779
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
870
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
 
871
            'sub')
780
872
        self.assertRaises(errors.PathNotChild, t.relpath,
781
873
                          'http://user@host.com/abs/path/sub')
782
874
        self.assertRaises(errors.PathNotChild, t.relpath,
795
887
 
796
888
    def test_connection_sharing_propagate_credentials(self):
797
889
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
798
 
        self.assertEquals('user', t._user)
799
 
        self.assertEquals('host.com', t._host)
 
890
        self.assertEquals('user', t._parsed_url.user)
 
891
        self.assertEquals('host.com', t._parsed_url.host)
800
892
        self.assertIs(None, t._get_connection())
801
 
        self.assertIs(None, t._password)
 
893
        self.assertIs(None, t._parsed_url.password)
802
894
        c = t.clone('subdir')
803
895
        self.assertIs(None, c._get_connection())
804
 
        self.assertIs(None, t._password)
 
896
        self.assertIs(None, t._parsed_url.password)
805
897
 
806
898
        # Simulate the user entering a password
807
899
        password = 'secret'
826
918
 
827
919
    def test_reuse_same_transport(self):
828
920
        possible_transports = []
829
 
        t1 = transport.get_transport('http://foo/',
 
921
        t1 = transport.get_transport_from_url('http://foo/',
830
922
                                     possible_transports=possible_transports)
831
923
        self.assertEqual([t1], possible_transports)
832
 
        t2 = transport.get_transport('http://foo/',
 
924
        t2 = transport.get_transport_from_url('http://foo/',
833
925
                                     possible_transports=[t1])
834
926
        self.assertIs(t1, t2)
835
927
 
836
928
        # Also check that final '/' are handled correctly
837
 
        t3 = transport.get_transport('http://foo/path/')
838
 
        t4 = transport.get_transport('http://foo/path',
 
929
        t3 = transport.get_transport_from_url('http://foo/path/')
 
930
        t4 = transport.get_transport_from_url('http://foo/path',
839
931
                                     possible_transports=[t3])
840
932
        self.assertIs(t3, t4)
841
933
 
842
 
        t5 = transport.get_transport('http://foo/path')
843
 
        t6 = transport.get_transport('http://foo/path/',
 
934
        t5 = transport.get_transport_from_url('http://foo/path')
 
935
        t6 = transport.get_transport_from_url('http://foo/path/',
844
936
                                     possible_transports=[t5])
845
937
        self.assertIs(t5, t6)
846
938
 
847
939
    def test_don_t_reuse_different_transport(self):
848
 
        t1 = transport.get_transport('http://foo/path')
849
 
        t2 = transport.get_transport('http://bar/path',
 
940
        t1 = transport.get_transport_from_url('http://foo/path')
 
941
        t2 = transport.get_transport_from_url('http://bar/path',
850
942
                                     possible_transports=[t1])
851
943
        self.assertIsNot(t1, t2)
852
944
 
853
945
 
854
946
class TestTransportTrace(tests.TestCase):
855
947
 
856
 
    def test_get(self):
857
 
        t = transport.get_transport('trace+memory://')
858
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
948
    def test_decorator(self):
 
949
        t = transport.get_transport_from_url('trace+memory://')
 
950
        self.assertIsInstance(
 
951
            t, bzrlib.transport.trace.TransportTraceDecorator)
859
952
 
860
953
    def test_clone_preserves_activity(self):
861
 
        t = transport.get_transport('trace+memory://')
 
954
        t = transport.get_transport_from_url('trace+memory://')
862
955
        t2 = t.clone('.')
863
956
        self.assertTrue(t is not t2)
864
957
        self.assertTrue(t._activity is t2._activity)
868
961
    # still won't cause a test failure when the top level Transport API
869
962
    # changes; so there is little return doing that.
870
963
    def test_get(self):
871
 
        t = transport.get_transport('trace+memory:///')
 
964
        t = transport.get_transport_from_url('trace+memory:///')
872
965
        t.put_bytes('foo', 'barish')
873
966
        t.get('foo')
874
967
        expected_result = []
880
973
        self.assertEqual(expected_result, t._activity)
881
974
 
882
975
    def test_readv(self):
883
 
        t = transport.get_transport('trace+memory:///')
 
976
        t = transport.get_transport_from_url('trace+memory:///')
884
977
        t.put_bytes('foo', 'barish')
885
978
        list(t.readv('foo', [(0, 1), (3, 2)],
886
979
                     adjust_for_latency=True, upper_limit=6))
896
989
class TestSSHConnections(tests.TestCaseWithTransport):
897
990
 
898
991
    def test_bzr_connect_to_bzr_ssh(self):
899
 
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
992
        """get_transport of a bzr+ssh:// behaves correctly.
900
993
 
901
994
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
902
995
        """
918
1011
        # SSH channel ourselves.  Surely this has already been implemented
919
1012
        # elsewhere?
920
1013
        started = []
 
1014
 
921
1015
        class StubSSHServer(stub_sftp.StubServer):
922
1016
 
923
1017
            test = self
929
1023
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
930
1024
 
931
1025
                # XXX: horribly inefficient, not to mention ugly.
932
 
                # Start a thread for each of stdin/out/err, and relay bytes from
933
 
                # the subprocess to channel and vice versa.
 
1026
                # Start a thread for each of stdin/out/err, and relay bytes
 
1027
                # from the subprocess to channel and vice versa.
934
1028
                def ferry_bytes(read, write, close):
935
1029
                    while True:
936
1030
                        bytes = read(1)
1004
1098
        result = http.unhtml_roughly(fake_html)
1005
1099
        self.assertEquals(len(result), 1000)
1006
1100
        self.assertStartsWith(result, " something!")
 
1101
 
 
1102
 
 
1103
class SomeDirectory(object):
 
1104
 
 
1105
    def look_up(self, name, url):
 
1106
        return "http://bar"
 
1107
 
 
1108
 
 
1109
class TestLocationToUrl(tests.TestCase):
 
1110
 
 
1111
    def get_base_location(self):
 
1112
        path = osutils.abspath('/foo/bar')
 
1113
        if path.startswith('/'):
 
1114
            url = 'file://%s' % (path,)
 
1115
        else:
 
1116
            # On Windows, abspaths start with the drive letter, so we have to
 
1117
            # add in the extra '/'
 
1118
            url = 'file:///%s' % (path,)
 
1119
        return path, url
 
1120
 
 
1121
    def test_regular_url(self):
 
1122
        self.assertEquals("file://foo", location_to_url("file://foo"))
 
1123
 
 
1124
    def test_directory(self):
 
1125
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1126
        self.addCleanup(directories.remove, "bar:")
 
1127
        self.assertEquals("http://bar", location_to_url("bar:"))
 
1128
 
 
1129
    def test_unicode_url(self):
 
1130
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1131
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1132
 
 
1133
    def test_unicode_path(self):
 
1134
        path, url = self.get_base_location()
 
1135
        location = path + "\xc3\xaf".decode("utf-8")
 
1136
        url += '%C3%AF'
 
1137
        self.assertEquals(url, location_to_url(location))
 
1138
 
 
1139
    def test_path(self):
 
1140
        path, url = self.get_base_location()
 
1141
        self.assertEquals(url, location_to_url(path))
 
1142
 
 
1143
    def test_relative_file_url(self):
 
1144
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
 
1145
            location_to_url("file:bar"))
 
1146
 
 
1147
    def test_absolute_file_url(self):
 
1148
        self.assertEquals("file:///bar", location_to_url("file:/bar"))