~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Andrew Bennetts
  • Date: 2010-07-29 11:17:57 UTC
  • mfrom: (5050.3.17 2.2)
  • mto: This revision was merged to the branch mainline in revision 5365.
  • Revision ID: andrew.bennetts@canonical.com-20100729111757-018h3pcefo7z0dnq
Merge lp:bzr/2.2 into lp:bzr.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
28
28
    transport,
29
29
    urlutils,
30
30
    )
31
 
from bzrlib.directory_service import directories
32
31
from bzrlib.transport import (
33
32
    chroot,
34
33
    fakenfs,
35
 
    http,
36
34
    local,
37
 
    location_to_url,
38
35
    memory,
39
36
    pathfilter,
40
37
    readonly,
41
38
    )
42
 
import bzrlib.transport.trace
43
39
from bzrlib.tests import (
44
40
    features,
45
41
    test_server,
52
48
class TestTransport(tests.TestCase):
53
49
    """Test the non transport-concrete class functionality."""
54
50
 
 
51
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
 
52
    # of try/finally -- vila 20100205
 
53
 
55
54
    def test__get_set_protocol_handlers(self):
56
55
        handlers = transport._get_protocol_handlers()
57
 
        self.assertNotEqual([], handlers.keys())
58
 
        transport._clear_protocol_handlers()
59
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
60
 
        self.assertEqual([], transport._get_protocol_handlers().keys())
 
56
        self.assertNotEqual([], handlers.keys( ))
 
57
        try:
 
58
            transport._clear_protocol_handlers()
 
59
            self.assertEqual([], transport._get_protocol_handlers().keys())
 
60
        finally:
 
61
            transport._set_protocol_handlers(handlers)
61
62
 
62
63
    def test_get_transport_modules(self):
63
64
        handlers = transport._get_protocol_handlers()
64
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
65
65
        # don't pollute the current handlers
66
66
        transport._clear_protocol_handlers()
67
 
 
68
67
        class SampleHandler(object):
69
68
            """I exist, isnt that enough?"""
70
 
        transport._clear_protocol_handlers()
71
 
        transport.register_transport_proto('foo')
72
 
        transport.register_lazy_transport('foo',
73
 
                                            'bzrlib.tests.test_transport',
74
 
                                            'TestTransport.SampleHandler')
75
 
        transport.register_transport_proto('bar')
76
 
        transport.register_lazy_transport('bar',
77
 
                                            'bzrlib.tests.test_transport',
78
 
                                            'TestTransport.SampleHandler')
79
 
        self.assertEqual([SampleHandler.__module__,
80
 
                            'bzrlib.transport.chroot',
81
 
                            'bzrlib.transport.pathfilter'],
82
 
                            transport._get_transport_modules())
 
69
        try:
 
70
            transport._clear_protocol_handlers()
 
71
            transport.register_transport_proto('foo')
 
72
            transport.register_lazy_transport('foo',
 
73
                                              'bzrlib.tests.test_transport',
 
74
                                              'TestTransport.SampleHandler')
 
75
            transport.register_transport_proto('bar')
 
76
            transport.register_lazy_transport('bar',
 
77
                                              'bzrlib.tests.test_transport',
 
78
                                              'TestTransport.SampleHandler')
 
79
            self.assertEqual([SampleHandler.__module__,
 
80
                              'bzrlib.transport.chroot',
 
81
                              'bzrlib.transport.pathfilter'],
 
82
                             transport._get_transport_modules())
 
83
        finally:
 
84
            transport._set_protocol_handlers(handlers)
83
85
 
84
86
    def test_transport_dependency(self):
85
87
        """Transport with missing dependency causes no error"""
86
88
        saved_handlers = transport._get_protocol_handlers()
87
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
88
89
        # don't pollute the current handlers
89
90
        transport._clear_protocol_handlers()
90
 
        transport.register_transport_proto('foo')
91
 
        transport.register_lazy_transport(
92
 
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
93
91
        try:
94
 
            transport.get_transport_from_url('foo://fooserver/foo')
95
 
        except errors.UnsupportedProtocol, e:
96
 
            e_str = str(e)
97
 
            self.assertEquals('Unsupported protocol'
98
 
                                ' for url "foo://fooserver/foo":'
99
 
                                ' Unable to import library "some_lib":'
100
 
                                ' testing missing dependency', str(e))
101
 
        else:
102
 
            self.fail('Did not raise UnsupportedProtocol')
 
92
            transport.register_transport_proto('foo')
 
93
            transport.register_lazy_transport(
 
94
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
95
            try:
 
96
                transport.get_transport('foo://fooserver/foo')
 
97
            except errors.UnsupportedProtocol, e:
 
98
                e_str = str(e)
 
99
                self.assertEquals('Unsupported protocol'
 
100
                                  ' for url "foo://fooserver/foo":'
 
101
                                  ' Unable to import library "some_lib":'
 
102
                                  ' testing missing dependency', str(e))
 
103
            else:
 
104
                self.fail('Did not raise UnsupportedProtocol')
 
105
        finally:
 
106
            # restore original values
 
107
            transport._set_protocol_handlers(saved_handlers)
103
108
 
104
109
    def test_transport_fallback(self):
105
110
        """Transport with missing dependency causes no error"""
106
111
        saved_handlers = transport._get_protocol_handlers()
107
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
108
 
        transport._clear_protocol_handlers()
109
 
        transport.register_transport_proto('foo')
110
 
        transport.register_lazy_transport(
111
 
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
112
 
        transport.register_lazy_transport(
113
 
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
114
 
        t = transport.get_transport_from_url('foo://fooserver/foo')
115
 
        self.assertTrue(isinstance(t, BackupTransportHandler))
 
112
        try:
 
113
            transport._clear_protocol_handlers()
 
114
            transport.register_transport_proto('foo')
 
115
            transport.register_lazy_transport(
 
116
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
117
            transport.register_lazy_transport(
 
118
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
119
            t = transport.get_transport('foo://fooserver/foo')
 
120
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
121
        finally:
 
122
            transport._set_protocol_handlers(saved_handlers)
116
123
 
117
124
    def test_ssh_hints(self):
118
125
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
119
126
        try:
120
 
            transport.get_transport_from_url('ssh://fooserver/foo')
 
127
            transport.get_transport('ssh://fooserver/foo')
121
128
        except errors.UnsupportedProtocol, e:
122
129
            e_str = str(e)
123
130
            self.assertEquals('Unsupported protocol'
138
145
        self.assertRaises(errors.ReadError, a_file.read, 40)
139
146
        a_file.close()
140
147
 
 
148
    def test__combine_paths(self):
 
149
        t = transport.Transport('/')
 
150
        self.assertEqual('/home/sarah/project/foo',
 
151
                         t._combine_paths('/home/sarah', 'project/foo'))
 
152
        self.assertEqual('/etc',
 
153
                         t._combine_paths('/home/sarah', '../../etc'))
 
154
        self.assertEqual('/etc',
 
155
                         t._combine_paths('/home/sarah', '../../../etc'))
 
156
        self.assertEqual('/etc',
 
157
                         t._combine_paths('/home/sarah', '/etc'))
 
158
 
141
159
    def test_local_abspath_non_local_transport(self):
142
160
        # the base implementation should throw
143
161
        t = memory.MemoryTransport()
200
218
 
201
219
    def test_coalesce_fudge(self):
202
220
        self.check([(10, 30, [(0, 10), (20, 10)]),
203
 
                    (100, 10, [(0, 10)]),
 
221
                    (100, 10, [(0, 10),]),
204
222
                   ], [(10, 10), (30, 10), (100, 10)],
205
 
                   fudge=10)
206
 
 
 
223
                   fudge=10
 
224
                  )
207
225
    def test_coalesce_max_size(self):
208
226
        self.check([(10, 20, [(0, 10), (10, 10)]),
209
227
                    (30, 50, [(0, 50)]),
210
228
                    # If one range is above max_size, it gets its own coalesced
211
229
                    # offset
212
 
                    (100, 80, [(0, 80)]),],
 
230
                    (100, 80, [(0, 80),]),],
213
231
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
214
 
                   max_size=50)
 
232
                   max_size=50
 
233
                  )
215
234
 
216
235
    def test_coalesce_no_max_size(self):
217
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
 
236
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
218
237
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
219
238
                  )
220
239
 
221
240
    def test_coalesce_default_limit(self):
222
241
        # By default we use a 100MB max size.
223
 
        ten_mb = 10 * 1024 * 1024
224
 
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
242
        ten_mb = 10*1024*1024
 
243
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
225
244
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
226
245
                   [(i*ten_mb, ten_mb) for i in range(11)])
227
 
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
228
 
                   [(i * ten_mb, ten_mb) for i in range(11)],
 
246
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
247
                   [(i*ten_mb, ten_mb) for i in range(11)],
229
248
                   max_size=1*1024*1024*1024)
230
249
 
231
250
 
236
255
        server.start_server()
237
256
        url = server.get_url()
238
257
        self.assertTrue(url in transport.transport_list_registry)
239
 
        t = transport.get_transport_from_url(url)
 
258
        t = transport.get_transport(url)
240
259
        del t
241
260
        server.stop_server()
242
261
        self.assertFalse(url in transport.transport_list_registry)
357
376
    def test_abspath(self):
358
377
        # The abspath is always relative to the chroot_url.
359
378
        server = chroot.ChrootServer(
360
 
            transport.get_transport_from_url('memory:///foo/bar/'))
 
379
            transport.get_transport('memory:///foo/bar/'))
361
380
        self.start_server(server)
362
 
        t = transport.get_transport_from_url(server.get_url())
 
381
        t = transport.get_transport(server.get_url())
363
382
        self.assertEqual(server.get_url(), t.abspath('/'))
364
383
 
365
384
        subdir_t = t.clone('subdir')
367
386
 
368
387
    def test_clone(self):
369
388
        server = chroot.ChrootServer(
370
 
            transport.get_transport_from_url('memory:///foo/bar/'))
 
389
            transport.get_transport('memory:///foo/bar/'))
371
390
        self.start_server(server)
372
 
        t = transport.get_transport_from_url(server.get_url())
 
391
        t = transport.get_transport(server.get_url())
373
392
        # relpath from root and root path are the same
374
393
        relpath_cloned = t.clone('foo')
375
394
        abspath_cloned = t.clone('/foo')
384
403
        This is so that it is not possible to escape a chroot by doing::
385
404
            url = chroot_transport.base
386
405
            parent_url = urlutils.join(url, '..')
387
 
            new_t = transport.get_transport_from_url(parent_url)
 
406
            new_t = transport.get_transport(parent_url)
388
407
        """
389
408
        server = chroot.ChrootServer(
390
 
            transport.get_transport_from_url('memory:///path/subpath'))
 
409
            transport.get_transport('memory:///path/subpath'))
391
410
        self.start_server(server)
392
 
        t = transport.get_transport_from_url(server.get_url())
393
 
        new_t = transport.get_transport_from_url(t.base)
 
411
        t = transport.get_transport(server.get_url())
 
412
        new_t = transport.get_transport(t.base)
394
413
        self.assertEqual(t.server, new_t.server)
395
414
        self.assertEqual(t.base, new_t.base)
396
415
 
401
420
        This is so that it is not possible to escape a chroot by doing::
402
421
            url = chroot_transport.base
403
422
            parent_url = urlutils.join(url, '..')
404
 
            new_t = transport.get_transport_from_url(parent_url)
 
423
            new_t = transport.get_transport(parent_url)
405
424
        """
406
 
        server = chroot.ChrootServer(
407
 
            transport.get_transport_from_url('memory:///path/'))
 
425
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
408
426
        self.start_server(server)
409
 
        t = transport.get_transport_from_url(server.get_url())
 
427
        t = transport.get_transport(server.get_url())
410
428
        self.assertRaises(
411
429
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
412
430
 
422
440
        backing_transport = memory.MemoryTransport()
423
441
        server = chroot.ChrootServer(backing_transport)
424
442
        server.start_server()
425
 
        self.addCleanup(server.stop_server)
426
 
        self.assertTrue(server.scheme
427
 
                        in transport._get_protocol_handlers().keys())
 
443
        try:
 
444
            self.assertTrue(server.scheme
 
445
                            in transport._get_protocol_handlers().keys())
 
446
        finally:
 
447
            server.stop_server()
428
448
 
429
449
    def test_stop_server(self):
430
450
        backing_transport = memory.MemoryTransport()
438
458
        backing_transport = memory.MemoryTransport()
439
459
        server = chroot.ChrootServer(backing_transport)
440
460
        server.start_server()
441
 
        self.addCleanup(server.stop_server)
442
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
461
        try:
 
462
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
463
        finally:
 
464
            server.stop_server()
443
465
 
444
466
 
445
467
class PathFilteringDecoratorTransportTest(tests.TestCase):
448
470
    def test_abspath(self):
449
471
        # The abspath is always relative to the base of the backing transport.
450
472
        server = pathfilter.PathFilteringServer(
451
 
            transport.get_transport_from_url('memory:///foo/bar/'),
 
473
            transport.get_transport('memory:///foo/bar/'),
452
474
            lambda x: x)
453
475
        server.start_server()
454
 
        t = transport.get_transport_from_url(server.get_url())
 
476
        t = transport.get_transport(server.get_url())
455
477
        self.assertEqual(server.get_url(), t.abspath('/'))
456
478
 
457
479
        subdir_t = t.clone('subdir')
460
482
 
461
483
    def make_pf_transport(self, filter_func=None):
462
484
        """Make a PathFilteringTransport backed by a MemoryTransport.
463
 
 
 
485
        
464
486
        :param filter_func: by default this will be a no-op function.  Use this
465
487
            parameter to override it."""
466
488
        if filter_func is None:
467
489
            filter_func = lambda x: x
468
490
        server = pathfilter.PathFilteringServer(
469
 
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
 
491
            transport.get_transport('memory:///foo/bar/'), filter_func)
470
492
        server.start_server()
471
493
        self.addCleanup(server.stop_server)
472
 
        return transport.get_transport_from_url(server.get_url())
 
494
        return transport.get_transport(server.get_url())
473
495
 
474
496
    def test__filter(self):
475
497
        # _filter (with an identity func as filter_func) always returns
488
510
 
489
511
    def test_filter_invocation(self):
490
512
        filter_log = []
491
 
 
492
513
        def filter(path):
493
514
            filter_log.append(path)
494
515
            return path
519
540
        otherwise) the filtering by doing::
520
541
            url = filtered_transport.base
521
542
            parent_url = urlutils.join(url, '..')
522
 
            new_t = transport.get_transport_from_url(parent_url)
 
543
            new_t = transport.get_transport(parent_url)
523
544
        """
524
545
        t = self.make_pf_transport()
525
 
        new_t = transport.get_transport_from_url(t.base)
 
546
        new_t = transport.get_transport(t.base)
526
547
        self.assertEqual(t.server, new_t.server)
527
548
        self.assertEqual(t.base, new_t.base)
528
549
 
541
562
        # connect to '.' via http which is not listable
542
563
        server = HttpServer()
543
564
        self.start_server(server)
544
 
        t = transport.get_transport_from_url('readonly+' + server.get_url())
545
 
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
 
565
        t = transport.get_transport('readonly+' + server.get_url())
 
566
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
546
567
        self.assertEqual(False, t.listable())
547
568
        self.assertEqual(True, t.is_readonly())
548
569
 
580
601
        # the url should be decorated appropriately
581
602
        self.assertStartsWith(server.get_url(), 'fakenfs+')
582
603
        # and we should be able to get a transport for it
583
 
        t = transport.get_transport_from_url(server.get_url())
 
604
        t = transport.get_transport(server.get_url())
584
605
        # which must be a FakeNFSTransportDecorator instance.
585
606
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
586
607
 
663
684
        base_url = self._server.get_url()
664
685
        url = self._adjust_url(base_url, relpath)
665
686
        # try getting the transport via the regular interface:
666
 
        t = transport.get_transport_from_url(url)
 
687
        t = transport.get_transport(url)
667
688
        # vila--20070607 if the following are commented out the test suite
668
689
        # still pass. Is this really still needed or was it a forgotten
669
690
        # temporary fix ?
674
695
        return t
675
696
 
676
697
 
677
 
class TestTransportFromPath(tests.TestCaseInTempDir):
678
 
 
679
 
    def test_with_path(self):
680
 
        t = transport.get_transport_from_path(self.test_dir)
681
 
        self.assertIsInstance(t, local.LocalTransport)
682
 
        self.assertEquals(t.base.rstrip("/"),
683
 
            urlutils.local_path_to_url(self.test_dir))
684
 
 
685
 
    def test_with_url(self):
686
 
        t = transport.get_transport_from_path("file:")
687
 
        self.assertIsInstance(t, local.LocalTransport)
688
 
        self.assertEquals(t.base.rstrip("/"),
689
 
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
690
 
 
691
 
 
692
 
class TestTransportFromUrl(tests.TestCaseInTempDir):
693
 
 
694
 
    def test_with_path(self):
695
 
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
696
 
            self.test_dir)
697
 
 
698
 
    def test_with_url(self):
699
 
        url = urlutils.local_path_to_url(self.test_dir)
700
 
        t = transport.get_transport_from_url(url)
701
 
        self.assertIsInstance(t, local.LocalTransport)
702
 
        self.assertEquals(t.base.rstrip("/"), url)
703
 
 
704
 
    def test_with_url_and_segment_parameters(self):
705
 
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
706
 
        t = transport.get_transport_from_url(url)
707
 
        self.assertIsInstance(t, local.LocalTransport)
708
 
        self.assertEquals(t.base.rstrip("/"), url)
709
 
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
710
 
            f.write("data")
711
 
        self.assertTrue(t.has("afile"))
712
 
 
713
 
 
714
698
class TestLocalTransports(tests.TestCase):
715
699
 
716
700
    def test_get_transport_from_abspath(self):
738
722
        self.assertEquals(t.local_abspath(''), here)
739
723
 
740
724
 
741
 
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
742
 
 
743
 
    def test_local_fdatasync_calls_fdatasync(self):
744
 
        """Check fdatasync on a stream tries to flush the data to the OS.
745
 
        
746
 
        We can't easily observe the external effect but we can at least see
747
 
        it's called.
748
 
        """
749
 
        sentinel = object()
750
 
        fdatasync = getattr(os, 'fdatasync', sentinel)
751
 
        if fdatasync is sentinel:
752
 
            raise tests.TestNotApplicable('fdatasync not supported')
753
 
        t = self.get_transport('.')
754
 
        calls = self.recordCalls(os, 'fdatasync')
755
 
        w = t.open_write_stream('out')
756
 
        w.write('foo')
757
 
        w.fdatasync()
758
 
        with open('out', 'rb') as f:
759
 
            # Should have been flushed.
760
 
            self.assertEquals(f.read(), 'foo')
761
 
        self.assertEquals(len(calls), 1, calls)
762
 
 
763
 
    def test_missing_directory(self):
764
 
        t = self.get_transport('.')
765
 
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
766
 
 
767
 
 
768
725
class TestWin32LocalTransport(tests.TestCase):
769
726
 
770
727
    def test_unc_clone_to_root(self):
786
743
    def test_parse_url(self):
787
744
        t = transport.ConnectedTransport(
788
745
            'http://simple.example.com/home/source')
789
 
        self.assertEquals(t._parsed_url.host, 'simple.example.com')
790
 
        self.assertEquals(t._parsed_url.port, None)
791
 
        self.assertEquals(t._parsed_url.path, '/home/source/')
792
 
        self.assertTrue(t._parsed_url.user is None)
793
 
        self.assertTrue(t._parsed_url.password is None)
 
746
        self.assertEquals(t._host, 'simple.example.com')
 
747
        self.assertEquals(t._port, None)
 
748
        self.assertEquals(t._path, '/home/source/')
 
749
        self.failUnless(t._user is None)
 
750
        self.failUnless(t._password is None)
794
751
 
795
752
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
796
753
 
797
754
    def test_parse_url_with_at_in_user(self):
798
755
        # Bug 228058
799
756
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
800
 
        self.assertEquals(t._parsed_url.user, 'user@host.com')
 
757
        self.assertEquals(t._user, 'user@host.com')
801
758
 
802
759
    def test_parse_quoted_url(self):
803
760
        t = transport.ConnectedTransport(
804
761
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
805
 
        self.assertEquals(t._parsed_url.host, 'exAmple.com')
806
 
        self.assertEquals(t._parsed_url.port, 2222)
807
 
        self.assertEquals(t._parsed_url.user, 'robey')
808
 
        self.assertEquals(t._parsed_url.password, 'h@t')
809
 
        self.assertEquals(t._parsed_url.path, '/path/')
 
762
        self.assertEquals(t._host, 'exAmple.com')
 
763
        self.assertEquals(t._port, 2222)
 
764
        self.assertEquals(t._user, 'robey')
 
765
        self.assertEquals(t._password, 'h@t')
 
766
        self.assertEquals(t._path, '/path/')
810
767
 
811
768
        # Base should not keep track of the password
812
 
        self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
 
769
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
813
770
 
814
771
    def test_parse_invalid_url(self):
815
772
        self.assertRaises(errors.InvalidURL,
819
776
    def test_relpath(self):
820
777
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
821
778
 
822
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
823
 
            'sub')
 
779
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
824
780
        self.assertRaises(errors.PathNotChild, t.relpath,
825
781
                          'http://user@host.com/abs/path/sub')
826
782
        self.assertRaises(errors.PathNotChild, t.relpath,
839
795
 
840
796
    def test_connection_sharing_propagate_credentials(self):
841
797
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
842
 
        self.assertEquals('user', t._parsed_url.user)
843
 
        self.assertEquals('host.com', t._parsed_url.host)
 
798
        self.assertEquals('user', t._user)
 
799
        self.assertEquals('host.com', t._host)
844
800
        self.assertIs(None, t._get_connection())
845
 
        self.assertIs(None, t._parsed_url.password)
 
801
        self.assertIs(None, t._password)
846
802
        c = t.clone('subdir')
847
803
        self.assertIs(None, c._get_connection())
848
 
        self.assertIs(None, t._parsed_url.password)
 
804
        self.assertIs(None, t._password)
849
805
 
850
806
        # Simulate the user entering a password
851
807
        password = 'secret'
870
826
 
871
827
    def test_reuse_same_transport(self):
872
828
        possible_transports = []
873
 
        t1 = transport.get_transport_from_url('http://foo/',
 
829
        t1 = transport.get_transport('http://foo/',
874
830
                                     possible_transports=possible_transports)
875
831
        self.assertEqual([t1], possible_transports)
876
 
        t2 = transport.get_transport_from_url('http://foo/',
 
832
        t2 = transport.get_transport('http://foo/',
877
833
                                     possible_transports=[t1])
878
834
        self.assertIs(t1, t2)
879
835
 
880
836
        # Also check that final '/' are handled correctly
881
 
        t3 = transport.get_transport_from_url('http://foo/path/')
882
 
        t4 = transport.get_transport_from_url('http://foo/path',
 
837
        t3 = transport.get_transport('http://foo/path/')
 
838
        t4 = transport.get_transport('http://foo/path',
883
839
                                     possible_transports=[t3])
884
840
        self.assertIs(t3, t4)
885
841
 
886
 
        t5 = transport.get_transport_from_url('http://foo/path')
887
 
        t6 = transport.get_transport_from_url('http://foo/path/',
 
842
        t5 = transport.get_transport('http://foo/path')
 
843
        t6 = transport.get_transport('http://foo/path/',
888
844
                                     possible_transports=[t5])
889
845
        self.assertIs(t5, t6)
890
846
 
891
847
    def test_don_t_reuse_different_transport(self):
892
 
        t1 = transport.get_transport_from_url('http://foo/path')
893
 
        t2 = transport.get_transport_from_url('http://bar/path',
 
848
        t1 = transport.get_transport('http://foo/path')
 
849
        t2 = transport.get_transport('http://bar/path',
894
850
                                     possible_transports=[t1])
895
851
        self.assertIsNot(t1, t2)
896
852
 
897
853
 
898
854
class TestTransportTrace(tests.TestCase):
899
855
 
900
 
    def test_decorator(self):
901
 
        t = transport.get_transport_from_url('trace+memory://')
902
 
        self.assertIsInstance(
903
 
            t, bzrlib.transport.trace.TransportTraceDecorator)
 
856
    def test_get(self):
 
857
        t = transport.get_transport('trace+memory://')
 
858
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
904
859
 
905
860
    def test_clone_preserves_activity(self):
906
 
        t = transport.get_transport_from_url('trace+memory://')
 
861
        t = transport.get_transport('trace+memory://')
907
862
        t2 = t.clone('.')
908
863
        self.assertTrue(t is not t2)
909
864
        self.assertTrue(t._activity is t2._activity)
913
868
    # still won't cause a test failure when the top level Transport API
914
869
    # changes; so there is little return doing that.
915
870
    def test_get(self):
916
 
        t = transport.get_transport_from_url('trace+memory:///')
 
871
        t = transport.get_transport('trace+memory:///')
917
872
        t.put_bytes('foo', 'barish')
918
873
        t.get('foo')
919
874
        expected_result = []
925
880
        self.assertEqual(expected_result, t._activity)
926
881
 
927
882
    def test_readv(self):
928
 
        t = transport.get_transport_from_url('trace+memory:///')
 
883
        t = transport.get_transport('trace+memory:///')
929
884
        t.put_bytes('foo', 'barish')
930
885
        list(t.readv('foo', [(0, 1), (3, 2)],
931
886
                     adjust_for_latency=True, upper_limit=6))
941
896
class TestSSHConnections(tests.TestCaseWithTransport):
942
897
 
943
898
    def test_bzr_connect_to_bzr_ssh(self):
944
 
        """get_transport of a bzr+ssh:// behaves correctly.
 
899
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
945
900
 
946
901
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
947
902
        """
963
918
        # SSH channel ourselves.  Surely this has already been implemented
964
919
        # elsewhere?
965
920
        started = []
966
 
 
967
921
        class StubSSHServer(stub_sftp.StubServer):
968
922
 
969
923
            test = self
975
929
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
976
930
 
977
931
                # XXX: horribly inefficient, not to mention ugly.
978
 
                # Start a thread for each of stdin/out/err, and relay bytes
979
 
                # from the subprocess to channel and vice versa.
 
932
                # Start a thread for each of stdin/out/err, and relay bytes from
 
933
                # the subprocess to channel and vice versa.
980
934
                def ferry_bytes(read, write, close):
981
935
                    while True:
982
936
                        bytes = read(1)
1001
955
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
1002
956
        # We *don't* want to override the default SSH vendor: the detected one
1003
957
        # is the one to use.
1004
 
 
1005
 
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
1006
 
        # inherits from SFTPServer which forces the SSH vendor to
1007
 
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
1008
958
        self.start_server(ssh_server)
1009
 
        port = ssh_server.port
 
959
        port = ssh_server._listener.port
1010
960
 
1011
961
        if sys.platform == 'win32':
1012
962
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
1013
963
        else:
1014
964
            bzr_remote_path = self.get_bzr_path()
1015
 
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
 
965
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
1016
966
 
1017
967
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
1018
968
        # variable is used to tell bzr what command to run on the remote end.
1039
989
        # And the rest are threads
1040
990
        for t in started[1:]:
1041
991
            t.join()
1042
 
 
1043
 
 
1044
 
class TestUnhtml(tests.TestCase):
1045
 
 
1046
 
    """Tests for unhtml_roughly"""
1047
 
 
1048
 
    def test_truncation(self):
1049
 
        fake_html = "<p>something!\n" * 1000
1050
 
        result = http.unhtml_roughly(fake_html)
1051
 
        self.assertEquals(len(result), 1000)
1052
 
        self.assertStartsWith(result, " something!")
1053
 
 
1054
 
 
1055
 
class SomeDirectory(object):
1056
 
 
1057
 
    def look_up(self, name, url):
1058
 
        return "http://bar"
1059
 
 
1060
 
 
1061
 
class TestLocationToUrl(tests.TestCase):
1062
 
 
1063
 
    def get_base_location(self):
1064
 
        path = osutils.abspath('/foo/bar')
1065
 
        if path.startswith('/'):
1066
 
            url = 'file://%s' % (path,)
1067
 
        else:
1068
 
            # On Windows, abspaths start with the drive letter, so we have to
1069
 
            # add in the extra '/'
1070
 
            url = 'file:///%s' % (path,)
1071
 
        return path, url
1072
 
 
1073
 
    def test_regular_url(self):
1074
 
        self.assertEquals("file://foo", location_to_url("file://foo"))
1075
 
 
1076
 
    def test_directory(self):
1077
 
        directories.register("bar:", SomeDirectory, "Dummy directory")
1078
 
        self.addCleanup(directories.remove, "bar:")
1079
 
        self.assertEquals("http://bar", location_to_url("bar:"))
1080
 
 
1081
 
    def test_unicode_url(self):
1082
 
        self.assertRaises(errors.InvalidURL, location_to_url,
1083
 
            "http://fo/\xc3\xaf".decode("utf-8"))
1084
 
 
1085
 
    def test_unicode_path(self):
1086
 
        path, url = self.get_base_location()
1087
 
        location = path + "\xc3\xaf".decode("utf-8")
1088
 
        url += '%C3%AF'
1089
 
        self.assertEquals(url, location_to_url(location))
1090
 
 
1091
 
    def test_path(self):
1092
 
        path, url = self.get_base_location()
1093
 
        self.assertEquals(url, location_to_url(path))
1094
 
 
1095
 
    def test_relative_file_url(self):
1096
 
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
1097
 
            location_to_url("file:bar"))
1098
 
 
1099
 
    def test_absolute_file_url(self):
1100
 
        self.assertEquals("file:///bar", location_to_url("file:/bar"))