~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-02-16 00:53:04 UTC
  • mfrom: (4997.1.3 find-branches-local)
  • Revision ID: pqm@pqm.ubuntu.com-20100216005304-1p8xafkhiizh6ugi
(Jelmer) Add BzrDir.list_branches().

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
28
28
    transport,
29
29
    urlutils,
30
30
    )
31
 
from bzrlib.directory_service import directories
32
31
from bzrlib.transport import (
33
32
    chroot,
34
33
    fakenfs,
35
 
    http,
36
34
    local,
37
 
    location_to_url,
38
35
    memory,
39
36
    pathfilter,
40
37
    readonly,
41
38
    )
42
 
import bzrlib.transport.trace
43
39
from bzrlib.tests import (
44
40
    features,
45
41
    test_server,
52
48
class TestTransport(tests.TestCase):
53
49
    """Test the non transport-concrete class functionality."""
54
50
 
 
51
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
 
52
    # of try/finally -- vila 20100205
 
53
 
55
54
    def test__get_set_protocol_handlers(self):
56
55
        handlers = transport._get_protocol_handlers()
57
 
        self.assertNotEqual([], handlers.keys())
58
 
        transport._clear_protocol_handlers()
59
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
60
 
        self.assertEqual([], transport._get_protocol_handlers().keys())
 
56
        self.assertNotEqual([], handlers.keys( ))
 
57
        try:
 
58
            transport._clear_protocol_handlers()
 
59
            self.assertEqual([], transport._get_protocol_handlers().keys())
 
60
        finally:
 
61
            transport._set_protocol_handlers(handlers)
61
62
 
62
63
    def test_get_transport_modules(self):
63
64
        handlers = transport._get_protocol_handlers()
64
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
65
65
        # don't pollute the current handlers
66
66
        transport._clear_protocol_handlers()
67
 
 
68
67
        class SampleHandler(object):
69
68
            """I exist, isnt that enough?"""
70
 
        transport._clear_protocol_handlers()
71
 
        transport.register_transport_proto('foo')
72
 
        transport.register_lazy_transport('foo',
73
 
                                            'bzrlib.tests.test_transport',
74
 
                                            'TestTransport.SampleHandler')
75
 
        transport.register_transport_proto('bar')
76
 
        transport.register_lazy_transport('bar',
77
 
                                            'bzrlib.tests.test_transport',
78
 
                                            'TestTransport.SampleHandler')
79
 
        self.assertEqual([SampleHandler.__module__,
80
 
                            'bzrlib.transport.chroot',
81
 
                            'bzrlib.transport.pathfilter'],
82
 
                            transport._get_transport_modules())
 
69
        try:
 
70
            transport._clear_protocol_handlers()
 
71
            transport.register_transport_proto('foo')
 
72
            transport.register_lazy_transport('foo',
 
73
                                              'bzrlib.tests.test_transport',
 
74
                                              'TestTransport.SampleHandler')
 
75
            transport.register_transport_proto('bar')
 
76
            transport.register_lazy_transport('bar',
 
77
                                              'bzrlib.tests.test_transport',
 
78
                                              'TestTransport.SampleHandler')
 
79
            self.assertEqual([SampleHandler.__module__,
 
80
                              'bzrlib.transport.chroot',
 
81
                              'bzrlib.transport.pathfilter'],
 
82
                             transport._get_transport_modules())
 
83
        finally:
 
84
            transport._set_protocol_handlers(handlers)
83
85
 
84
86
    def test_transport_dependency(self):
85
87
        """Transport with missing dependency causes no error"""
86
88
        saved_handlers = transport._get_protocol_handlers()
87
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
88
89
        # don't pollute the current handlers
89
90
        transport._clear_protocol_handlers()
90
 
        transport.register_transport_proto('foo')
91
 
        transport.register_lazy_transport(
92
 
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
93
91
        try:
94
 
            transport.get_transport_from_url('foo://fooserver/foo')
95
 
        except errors.UnsupportedProtocol, e:
96
 
            e_str = str(e)
97
 
            self.assertEquals('Unsupported protocol'
98
 
                                ' for url "foo://fooserver/foo":'
99
 
                                ' Unable to import library "some_lib":'
100
 
                                ' testing missing dependency', str(e))
101
 
        else:
102
 
            self.fail('Did not raise UnsupportedProtocol')
 
92
            transport.register_transport_proto('foo')
 
93
            transport.register_lazy_transport(
 
94
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
95
            try:
 
96
                transport.get_transport('foo://fooserver/foo')
 
97
            except errors.UnsupportedProtocol, e:
 
98
                e_str = str(e)
 
99
                self.assertEquals('Unsupported protocol'
 
100
                                  ' for url "foo://fooserver/foo":'
 
101
                                  ' Unable to import library "some_lib":'
 
102
                                  ' testing missing dependency', str(e))
 
103
            else:
 
104
                self.fail('Did not raise UnsupportedProtocol')
 
105
        finally:
 
106
            # restore original values
 
107
            transport._set_protocol_handlers(saved_handlers)
103
108
 
104
109
    def test_transport_fallback(self):
105
110
        """Transport with missing dependency causes no error"""
106
111
        saved_handlers = transport._get_protocol_handlers()
107
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
108
 
        transport._clear_protocol_handlers()
109
 
        transport.register_transport_proto('foo')
110
 
        transport.register_lazy_transport(
111
 
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
112
 
        transport.register_lazy_transport(
113
 
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
114
 
        t = transport.get_transport_from_url('foo://fooserver/foo')
115
 
        self.assertTrue(isinstance(t, BackupTransportHandler))
 
112
        try:
 
113
            transport._clear_protocol_handlers()
 
114
            transport.register_transport_proto('foo')
 
115
            transport.register_lazy_transport(
 
116
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
117
            transport.register_lazy_transport(
 
118
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
119
            t = transport.get_transport('foo://fooserver/foo')
 
120
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
121
        finally:
 
122
            transport._set_protocol_handlers(saved_handlers)
116
123
 
117
124
    def test_ssh_hints(self):
118
125
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
119
126
        try:
120
 
            transport.get_transport_from_url('ssh://fooserver/foo')
 
127
            transport.get_transport('ssh://fooserver/foo')
121
128
        except errors.UnsupportedProtocol, e:
122
129
            e_str = str(e)
123
130
            self.assertEquals('Unsupported protocol'
138
145
        self.assertRaises(errors.ReadError, a_file.read, 40)
139
146
        a_file.close()
140
147
 
 
148
    def test__combine_paths(self):
 
149
        t = transport.Transport('/')
 
150
        self.assertEqual('/home/sarah/project/foo',
 
151
                         t._combine_paths('/home/sarah', 'project/foo'))
 
152
        self.assertEqual('/etc',
 
153
                         t._combine_paths('/home/sarah', '../../etc'))
 
154
        self.assertEqual('/etc',
 
155
                         t._combine_paths('/home/sarah', '../../../etc'))
 
156
        self.assertEqual('/etc',
 
157
                         t._combine_paths('/home/sarah', '/etc'))
 
158
 
141
159
    def test_local_abspath_non_local_transport(self):
142
160
        # the base implementation should throw
143
161
        t = memory.MemoryTransport()
200
218
 
201
219
    def test_coalesce_fudge(self):
202
220
        self.check([(10, 30, [(0, 10), (20, 10)]),
203
 
                    (100, 10, [(0, 10)]),
 
221
                    (100, 10, [(0, 10),]),
204
222
                   ], [(10, 10), (30, 10), (100, 10)],
205
 
                   fudge=10)
206
 
 
 
223
                   fudge=10
 
224
                  )
207
225
    def test_coalesce_max_size(self):
208
226
        self.check([(10, 20, [(0, 10), (10, 10)]),
209
227
                    (30, 50, [(0, 50)]),
210
228
                    # If one range is above max_size, it gets its own coalesced
211
229
                    # offset
212
 
                    (100, 80, [(0, 80)]),],
 
230
                    (100, 80, [(0, 80),]),],
213
231
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
214
 
                   max_size=50)
 
232
                   max_size=50
 
233
                  )
215
234
 
216
235
    def test_coalesce_no_max_size(self):
217
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
 
236
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
218
237
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
219
238
                  )
220
239
 
221
240
    def test_coalesce_default_limit(self):
222
241
        # By default we use a 100MB max size.
223
 
        ten_mb = 10 * 1024 * 1024
224
 
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
242
        ten_mb = 10*1024*1024
 
243
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
225
244
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
226
245
                   [(i*ten_mb, ten_mb) for i in range(11)])
227
 
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
228
 
                   [(i * ten_mb, ten_mb) for i in range(11)],
 
246
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
247
                   [(i*ten_mb, ten_mb) for i in range(11)],
229
248
                   max_size=1*1024*1024*1024)
230
249
 
231
250
 
236
255
        server.start_server()
237
256
        url = server.get_url()
238
257
        self.assertTrue(url in transport.transport_list_registry)
239
 
        t = transport.get_transport_from_url(url)
 
258
        t = transport.get_transport(url)
240
259
        del t
241
260
        server.stop_server()
242
261
        self.assertFalse(url in transport.transport_list_registry)
357
376
    def test_abspath(self):
358
377
        # The abspath is always relative to the chroot_url.
359
378
        server = chroot.ChrootServer(
360
 
            transport.get_transport_from_url('memory:///foo/bar/'))
 
379
            transport.get_transport('memory:///foo/bar/'))
361
380
        self.start_server(server)
362
 
        t = transport.get_transport_from_url(server.get_url())
 
381
        t = transport.get_transport(server.get_url())
363
382
        self.assertEqual(server.get_url(), t.abspath('/'))
364
383
 
365
384
        subdir_t = t.clone('subdir')
367
386
 
368
387
    def test_clone(self):
369
388
        server = chroot.ChrootServer(
370
 
            transport.get_transport_from_url('memory:///foo/bar/'))
 
389
            transport.get_transport('memory:///foo/bar/'))
371
390
        self.start_server(server)
372
 
        t = transport.get_transport_from_url(server.get_url())
 
391
        t = transport.get_transport(server.get_url())
373
392
        # relpath from root and root path are the same
374
393
        relpath_cloned = t.clone('foo')
375
394
        abspath_cloned = t.clone('/foo')
384
403
        This is so that it is not possible to escape a chroot by doing::
385
404
            url = chroot_transport.base
386
405
            parent_url = urlutils.join(url, '..')
387
 
            new_t = transport.get_transport_from_url(parent_url)
 
406
            new_t = transport.get_transport(parent_url)
388
407
        """
389
408
        server = chroot.ChrootServer(
390
 
            transport.get_transport_from_url('memory:///path/subpath'))
 
409
            transport.get_transport('memory:///path/subpath'))
391
410
        self.start_server(server)
392
 
        t = transport.get_transport_from_url(server.get_url())
393
 
        new_t = transport.get_transport_from_url(t.base)
 
411
        t = transport.get_transport(server.get_url())
 
412
        new_t = transport.get_transport(t.base)
394
413
        self.assertEqual(t.server, new_t.server)
395
414
        self.assertEqual(t.base, new_t.base)
396
415
 
401
420
        This is so that it is not possible to escape a chroot by doing::
402
421
            url = chroot_transport.base
403
422
            parent_url = urlutils.join(url, '..')
404
 
            new_t = transport.get_transport_from_url(parent_url)
 
423
            new_t = transport.get_transport(parent_url)
405
424
        """
406
 
        server = chroot.ChrootServer(
407
 
            transport.get_transport_from_url('memory:///path/'))
 
425
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
408
426
        self.start_server(server)
409
 
        t = transport.get_transport_from_url(server.get_url())
 
427
        t = transport.get_transport(server.get_url())
410
428
        self.assertRaises(
411
429
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
412
430
 
422
440
        backing_transport = memory.MemoryTransport()
423
441
        server = chroot.ChrootServer(backing_transport)
424
442
        server.start_server()
425
 
        self.addCleanup(server.stop_server)
426
 
        self.assertTrue(server.scheme
427
 
                        in transport._get_protocol_handlers().keys())
 
443
        try:
 
444
            self.assertTrue(server.scheme
 
445
                            in transport._get_protocol_handlers().keys())
 
446
        finally:
 
447
            server.stop_server()
428
448
 
429
449
    def test_stop_server(self):
430
450
        backing_transport = memory.MemoryTransport()
438
458
        backing_transport = memory.MemoryTransport()
439
459
        server = chroot.ChrootServer(backing_transport)
440
460
        server.start_server()
441
 
        self.addCleanup(server.stop_server)
442
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
461
        try:
 
462
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
463
        finally:
 
464
            server.stop_server()
443
465
 
444
466
 
445
467
class PathFilteringDecoratorTransportTest(tests.TestCase):
448
470
    def test_abspath(self):
449
471
        # The abspath is always relative to the base of the backing transport.
450
472
        server = pathfilter.PathFilteringServer(
451
 
            transport.get_transport_from_url('memory:///foo/bar/'),
 
473
            transport.get_transport('memory:///foo/bar/'),
452
474
            lambda x: x)
453
475
        server.start_server()
454
 
        t = transport.get_transport_from_url(server.get_url())
 
476
        t = transport.get_transport(server.get_url())
455
477
        self.assertEqual(server.get_url(), t.abspath('/'))
456
478
 
457
479
        subdir_t = t.clone('subdir')
460
482
 
461
483
    def make_pf_transport(self, filter_func=None):
462
484
        """Make a PathFilteringTransport backed by a MemoryTransport.
463
 
 
 
485
        
464
486
        :param filter_func: by default this will be a no-op function.  Use this
465
487
            parameter to override it."""
466
488
        if filter_func is None:
467
489
            filter_func = lambda x: x
468
490
        server = pathfilter.PathFilteringServer(
469
 
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
 
491
            transport.get_transport('memory:///foo/bar/'), filter_func)
470
492
        server.start_server()
471
493
        self.addCleanup(server.stop_server)
472
 
        return transport.get_transport_from_url(server.get_url())
 
494
        return transport.get_transport(server.get_url())
473
495
 
474
496
    def test__filter(self):
475
497
        # _filter (with an identity func as filter_func) always returns
488
510
 
489
511
    def test_filter_invocation(self):
490
512
        filter_log = []
491
 
 
492
513
        def filter(path):
493
514
            filter_log.append(path)
494
515
            return path
519
540
        otherwise) the filtering by doing::
520
541
            url = filtered_transport.base
521
542
            parent_url = urlutils.join(url, '..')
522
 
            new_t = transport.get_transport_from_url(parent_url)
 
543
            new_t = transport.get_transport(parent_url)
523
544
        """
524
545
        t = self.make_pf_transport()
525
 
        new_t = transport.get_transport_from_url(t.base)
 
546
        new_t = transport.get_transport(t.base)
526
547
        self.assertEqual(t.server, new_t.server)
527
548
        self.assertEqual(t.base, new_t.base)
528
549
 
541
562
        # connect to '.' via http which is not listable
542
563
        server = HttpServer()
543
564
        self.start_server(server)
544
 
        t = transport.get_transport_from_url('readonly+' + server.get_url())
545
 
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
 
565
        t = transport.get_transport('readonly+' + server.get_url())
 
566
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
546
567
        self.assertEqual(False, t.listable())
547
568
        self.assertEqual(True, t.is_readonly())
548
569
 
580
601
        # the url should be decorated appropriately
581
602
        self.assertStartsWith(server.get_url(), 'fakenfs+')
582
603
        # and we should be able to get a transport for it
583
 
        t = transport.get_transport_from_url(server.get_url())
 
604
        t = transport.get_transport(server.get_url())
584
605
        # which must be a FakeNFSTransportDecorator instance.
585
606
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
586
607
 
663
684
        base_url = self._server.get_url()
664
685
        url = self._adjust_url(base_url, relpath)
665
686
        # try getting the transport via the regular interface:
666
 
        t = transport.get_transport_from_url(url)
 
687
        t = transport.get_transport(url)
667
688
        # vila--20070607 if the following are commented out the test suite
668
689
        # still pass. Is this really still needed or was it a forgotten
669
690
        # temporary fix ?
674
695
        return t
675
696
 
676
697
 
677
 
class TestTransportFromPath(tests.TestCaseInTempDir):
678
 
 
679
 
    def test_with_path(self):
680
 
        t = transport.get_transport_from_path(self.test_dir)
681
 
        self.assertIsInstance(t, local.LocalTransport)
682
 
        self.assertEquals(t.base.rstrip("/"),
683
 
            urlutils.local_path_to_url(self.test_dir))
684
 
 
685
 
    def test_with_url(self):
686
 
        t = transport.get_transport_from_path("file:")
687
 
        self.assertIsInstance(t, local.LocalTransport)
688
 
        self.assertEquals(t.base.rstrip("/"),
689
 
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
690
 
 
691
 
 
692
 
class TestTransportFromUrl(tests.TestCaseInTempDir):
693
 
 
694
 
    def test_with_path(self):
695
 
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
696
 
            self.test_dir)
697
 
 
698
 
    def test_with_url(self):
699
 
        url = urlutils.local_path_to_url(self.test_dir)
700
 
        t = transport.get_transport_from_url(url)
701
 
        self.assertIsInstance(t, local.LocalTransport)
702
 
        self.assertEquals(t.base.rstrip("/"), url)
703
 
 
704
 
    def test_with_url_and_segment_parameters(self):
705
 
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
706
 
        t = transport.get_transport_from_url(url)
707
 
        self.assertIsInstance(t, local.LocalTransport)
708
 
        self.assertEquals(t.base.rstrip("/"), url)
709
 
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
710
 
            f.write("data")
711
 
        self.assertTrue(t.has("afile"))
712
 
 
713
 
 
714
698
class TestLocalTransports(tests.TestCase):
715
699
 
716
700
    def test_get_transport_from_abspath(self):
738
722
        self.assertEquals(t.local_abspath(''), here)
739
723
 
740
724
 
741
 
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
742
 
 
743
 
    def test_local_fdatasync_calls_fdatasync(self):
744
 
        """Check fdatasync on a stream tries to flush the data to the OS.
745
 
        
746
 
        We can't easily observe the external effect but we can at least see
747
 
        it's called.
748
 
        """
749
 
        sentinel = object()
750
 
        fdatasync = getattr(os, 'fdatasync', sentinel)
751
 
        if fdatasync is sentinel:
752
 
            raise tests.TestNotApplicable('fdatasync not supported')
753
 
        t = self.get_transport('.')
754
 
        calls = self.recordCalls(os, 'fdatasync')
755
 
        w = t.open_write_stream('out')
756
 
        w.write('foo')
757
 
        w.fdatasync()
758
 
        with open('out', 'rb') as f:
759
 
            # Should have been flushed.
760
 
            self.assertEquals(f.read(), 'foo')
761
 
        self.assertEquals(len(calls), 1, calls)
762
 
 
763
 
 
764
725
class TestWin32LocalTransport(tests.TestCase):
765
726
 
766
727
    def test_unc_clone_to_root(self):
782
743
    def test_parse_url(self):
783
744
        t = transport.ConnectedTransport(
784
745
            'http://simple.example.com/home/source')
785
 
        self.assertEquals(t._parsed_url.host, 'simple.example.com')
786
 
        self.assertEquals(t._parsed_url.port, None)
787
 
        self.assertEquals(t._parsed_url.path, '/home/source/')
788
 
        self.assertTrue(t._parsed_url.user is None)
789
 
        self.assertTrue(t._parsed_url.password is None)
 
746
        self.assertEquals(t._host, 'simple.example.com')
 
747
        self.assertEquals(t._port, None)
 
748
        self.assertEquals(t._path, '/home/source/')
 
749
        self.failUnless(t._user is None)
 
750
        self.failUnless(t._password is None)
790
751
 
791
752
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
792
753
 
793
754
    def test_parse_url_with_at_in_user(self):
794
755
        # Bug 228058
795
756
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
796
 
        self.assertEquals(t._parsed_url.user, 'user@host.com')
 
757
        self.assertEquals(t._user, 'user@host.com')
797
758
 
798
759
    def test_parse_quoted_url(self):
799
760
        t = transport.ConnectedTransport(
800
761
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
801
 
        self.assertEquals(t._parsed_url.host, 'exAmple.com')
802
 
        self.assertEquals(t._parsed_url.port, 2222)
803
 
        self.assertEquals(t._parsed_url.user, 'robey')
804
 
        self.assertEquals(t._parsed_url.password, 'h@t')
805
 
        self.assertEquals(t._parsed_url.path, '/path/')
 
762
        self.assertEquals(t._host, 'exAmple.com')
 
763
        self.assertEquals(t._port, 2222)
 
764
        self.assertEquals(t._user, 'robey')
 
765
        self.assertEquals(t._password, 'h@t')
 
766
        self.assertEquals(t._path, '/path/')
806
767
 
807
768
        # Base should not keep track of the password
808
 
        self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
 
769
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
809
770
 
810
771
    def test_parse_invalid_url(self):
811
772
        self.assertRaises(errors.InvalidURL,
815
776
    def test_relpath(self):
816
777
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
817
778
 
818
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
819
 
            'sub')
 
779
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
820
780
        self.assertRaises(errors.PathNotChild, t.relpath,
821
781
                          'http://user@host.com/abs/path/sub')
822
782
        self.assertRaises(errors.PathNotChild, t.relpath,
835
795
 
836
796
    def test_connection_sharing_propagate_credentials(self):
837
797
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
838
 
        self.assertEquals('user', t._parsed_url.user)
839
 
        self.assertEquals('host.com', t._parsed_url.host)
 
798
        self.assertEquals('user', t._user)
 
799
        self.assertEquals('host.com', t._host)
840
800
        self.assertIs(None, t._get_connection())
841
 
        self.assertIs(None, t._parsed_url.password)
 
801
        self.assertIs(None, t._password)
842
802
        c = t.clone('subdir')
843
803
        self.assertIs(None, c._get_connection())
844
 
        self.assertIs(None, t._parsed_url.password)
 
804
        self.assertIs(None, t._password)
845
805
 
846
806
        # Simulate the user entering a password
847
807
        password = 'secret'
866
826
 
867
827
    def test_reuse_same_transport(self):
868
828
        possible_transports = []
869
 
        t1 = transport.get_transport_from_url('http://foo/',
 
829
        t1 = transport.get_transport('http://foo/',
870
830
                                     possible_transports=possible_transports)
871
831
        self.assertEqual([t1], possible_transports)
872
 
        t2 = transport.get_transport_from_url('http://foo/',
 
832
        t2 = transport.get_transport('http://foo/',
873
833
                                     possible_transports=[t1])
874
834
        self.assertIs(t1, t2)
875
835
 
876
836
        # Also check that final '/' are handled correctly
877
 
        t3 = transport.get_transport_from_url('http://foo/path/')
878
 
        t4 = transport.get_transport_from_url('http://foo/path',
 
837
        t3 = transport.get_transport('http://foo/path/')
 
838
        t4 = transport.get_transport('http://foo/path',
879
839
                                     possible_transports=[t3])
880
840
        self.assertIs(t3, t4)
881
841
 
882
 
        t5 = transport.get_transport_from_url('http://foo/path')
883
 
        t6 = transport.get_transport_from_url('http://foo/path/',
 
842
        t5 = transport.get_transport('http://foo/path')
 
843
        t6 = transport.get_transport('http://foo/path/',
884
844
                                     possible_transports=[t5])
885
845
        self.assertIs(t5, t6)
886
846
 
887
847
    def test_don_t_reuse_different_transport(self):
888
 
        t1 = transport.get_transport_from_url('http://foo/path')
889
 
        t2 = transport.get_transport_from_url('http://bar/path',
 
848
        t1 = transport.get_transport('http://foo/path')
 
849
        t2 = transport.get_transport('http://bar/path',
890
850
                                     possible_transports=[t1])
891
851
        self.assertIsNot(t1, t2)
892
852
 
893
853
 
894
854
class TestTransportTrace(tests.TestCase):
895
855
 
896
 
    def test_decorator(self):
897
 
        t = transport.get_transport_from_url('trace+memory://')
898
 
        self.assertIsInstance(
899
 
            t, bzrlib.transport.trace.TransportTraceDecorator)
 
856
    def test_get(self):
 
857
        t = transport.get_transport('trace+memory://')
 
858
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
900
859
 
901
860
    def test_clone_preserves_activity(self):
902
 
        t = transport.get_transport_from_url('trace+memory://')
 
861
        t = transport.get_transport('trace+memory://')
903
862
        t2 = t.clone('.')
904
863
        self.assertTrue(t is not t2)
905
864
        self.assertTrue(t._activity is t2._activity)
909
868
    # still won't cause a test failure when the top level Transport API
910
869
    # changes; so there is little return doing that.
911
870
    def test_get(self):
912
 
        t = transport.get_transport_from_url('trace+memory:///')
 
871
        t = transport.get_transport('trace+memory:///')
913
872
        t.put_bytes('foo', 'barish')
914
873
        t.get('foo')
915
874
        expected_result = []
921
880
        self.assertEqual(expected_result, t._activity)
922
881
 
923
882
    def test_readv(self):
924
 
        t = transport.get_transport_from_url('trace+memory:///')
 
883
        t = transport.get_transport('trace+memory:///')
925
884
        t.put_bytes('foo', 'barish')
926
885
        list(t.readv('foo', [(0, 1), (3, 2)],
927
886
                     adjust_for_latency=True, upper_limit=6))
937
896
class TestSSHConnections(tests.TestCaseWithTransport):
938
897
 
939
898
    def test_bzr_connect_to_bzr_ssh(self):
940
 
        """get_transport of a bzr+ssh:// behaves correctly.
 
899
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
941
900
 
942
901
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
943
902
        """
959
918
        # SSH channel ourselves.  Surely this has already been implemented
960
919
        # elsewhere?
961
920
        started = []
962
 
 
963
921
        class StubSSHServer(stub_sftp.StubServer):
964
922
 
965
923
            test = self
971
929
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
972
930
 
973
931
                # XXX: horribly inefficient, not to mention ugly.
974
 
                # Start a thread for each of stdin/out/err, and relay bytes
975
 
                # from the subprocess to channel and vice versa.
 
932
                # Start a thread for each of stdin/out/err, and relay bytes from
 
933
                # the subprocess to channel and vice versa.
976
934
                def ferry_bytes(read, write, close):
977
935
                    while True:
978
936
                        bytes = read(1)
997
955
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
998
956
        # We *don't* want to override the default SSH vendor: the detected one
999
957
        # is the one to use.
1000
 
 
1001
 
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
1002
 
        # inherits from SFTPServer which forces the SSH vendor to
1003
 
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
1004
958
        self.start_server(ssh_server)
1005
 
        port = ssh_server.port
 
959
        port = ssh_server._listener.port
1006
960
 
1007
961
        if sys.platform == 'win32':
1008
962
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
1009
963
        else:
1010
964
            bzr_remote_path = self.get_bzr_path()
1011
 
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
 
965
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
1012
966
 
1013
967
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
1014
968
        # variable is used to tell bzr what command to run on the remote end.
1035
989
        # And the rest are threads
1036
990
        for t in started[1:]:
1037
991
            t.join()
1038
 
 
1039
 
 
1040
 
class TestUnhtml(tests.TestCase):
1041
 
 
1042
 
    """Tests for unhtml_roughly"""
1043
 
 
1044
 
    def test_truncation(self):
1045
 
        fake_html = "<p>something!\n" * 1000
1046
 
        result = http.unhtml_roughly(fake_html)
1047
 
        self.assertEquals(len(result), 1000)
1048
 
        self.assertStartsWith(result, " something!")
1049
 
 
1050
 
 
1051
 
class SomeDirectory(object):
1052
 
 
1053
 
    def look_up(self, name, url):
1054
 
        return "http://bar"
1055
 
 
1056
 
 
1057
 
class TestLocationToUrl(tests.TestCase):
1058
 
 
1059
 
    def get_base_location(self):
1060
 
        path = osutils.abspath('/foo/bar')
1061
 
        if path.startswith('/'):
1062
 
            url = 'file://%s' % (path,)
1063
 
        else:
1064
 
            # On Windows, abspaths start with the drive letter, so we have to
1065
 
            # add in the extra '/'
1066
 
            url = 'file:///%s' % (path,)
1067
 
        return path, url
1068
 
 
1069
 
    def test_regular_url(self):
1070
 
        self.assertEquals("file://foo", location_to_url("file://foo"))
1071
 
 
1072
 
    def test_directory(self):
1073
 
        directories.register("bar:", SomeDirectory, "Dummy directory")
1074
 
        self.addCleanup(directories.remove, "bar:")
1075
 
        self.assertEquals("http://bar", location_to_url("bar:"))
1076
 
 
1077
 
    def test_unicode_url(self):
1078
 
        self.assertRaises(errors.InvalidURL, location_to_url,
1079
 
            "http://fo/\xc3\xaf".decode("utf-8"))
1080
 
 
1081
 
    def test_unicode_path(self):
1082
 
        path, url = self.get_base_location()
1083
 
        location = path + "\xc3\xaf".decode("utf-8")
1084
 
        url += '%C3%AF'
1085
 
        self.assertEquals(url, location_to_url(location))
1086
 
 
1087
 
    def test_path(self):
1088
 
        path, url = self.get_base_location()
1089
 
        self.assertEquals(url, location_to_url(path))
1090
 
 
1091
 
    def test_relative_file_url(self):
1092
 
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
1093
 
            location_to_url("file:bar"))
1094
 
 
1095
 
    def test_absolute_file_url(self):
1096
 
        self.assertEquals("file:///bar", location_to_url("file:/bar"))