~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2011-12-16 19:18:39 UTC
  • mto: This revision was merged to the branch mainline in revision 6391.
  • Revision ID: jelmer@samba.org-20111216191839-eg681lxqibi1qxu1
Fix remaining tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
 
18
18
from cStringIO import StringIO
 
19
import errno
19
20
import os
20
21
import subprocess
21
22
import sys
28
29
    transport,
29
30
    urlutils,
30
31
    )
 
32
from bzrlib.directory_service import directories
31
33
from bzrlib.transport import (
32
34
    chroot,
33
35
    fakenfs,
34
36
    http,
35
37
    local,
 
38
    location_to_url,
36
39
    memory,
37
40
    pathfilter,
38
41
    readonly,
39
42
    )
 
43
import bzrlib.transport.trace
40
44
from bzrlib.tests import (
41
45
    features,
42
46
    test_server,
49
53
class TestTransport(tests.TestCase):
50
54
    """Test the non transport-concrete class functionality."""
51
55
 
52
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
53
 
    # of try/finally -- vila 20100205
54
 
 
55
56
    def test__get_set_protocol_handlers(self):
56
57
        handlers = transport._get_protocol_handlers()
57
 
        self.assertNotEqual([], handlers.keys( ))
58
 
        try:
59
 
            transport._clear_protocol_handlers()
60
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
61
 
        finally:
62
 
            transport._set_protocol_handlers(handlers)
 
58
        self.assertNotEqual([], handlers.keys())
 
59
        transport._clear_protocol_handlers()
 
60
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
61
        self.assertEqual([], transport._get_protocol_handlers().keys())
63
62
 
64
63
    def test_get_transport_modules(self):
65
64
        handlers = transport._get_protocol_handlers()
 
65
        self.addCleanup(transport._set_protocol_handlers, handlers)
66
66
        # don't pollute the current handlers
67
67
        transport._clear_protocol_handlers()
 
68
 
68
69
        class SampleHandler(object):
69
70
            """I exist, isnt that enough?"""
70
 
        try:
71
 
            transport._clear_protocol_handlers()
72
 
            transport.register_transport_proto('foo')
73
 
            transport.register_lazy_transport('foo',
74
 
                                              'bzrlib.tests.test_transport',
75
 
                                              'TestTransport.SampleHandler')
76
 
            transport.register_transport_proto('bar')
77
 
            transport.register_lazy_transport('bar',
78
 
                                              'bzrlib.tests.test_transport',
79
 
                                              'TestTransport.SampleHandler')
80
 
            self.assertEqual([SampleHandler.__module__,
81
 
                              'bzrlib.transport.chroot',
82
 
                              'bzrlib.transport.pathfilter'],
83
 
                             transport._get_transport_modules())
84
 
        finally:
85
 
            transport._set_protocol_handlers(handlers)
 
71
        transport._clear_protocol_handlers()
 
72
        transport.register_transport_proto('foo')
 
73
        transport.register_lazy_transport('foo',
 
74
                                            'bzrlib.tests.test_transport',
 
75
                                            'TestTransport.SampleHandler')
 
76
        transport.register_transport_proto('bar')
 
77
        transport.register_lazy_transport('bar',
 
78
                                            'bzrlib.tests.test_transport',
 
79
                                            'TestTransport.SampleHandler')
 
80
        self.assertEqual([SampleHandler.__module__,
 
81
                            'bzrlib.transport.chroot',
 
82
                            'bzrlib.transport.pathfilter'],
 
83
                            transport._get_transport_modules())
86
84
 
87
85
    def test_transport_dependency(self):
88
86
        """Transport with missing dependency causes no error"""
89
87
        saved_handlers = transport._get_protocol_handlers()
 
88
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
90
89
        # don't pollute the current handlers
91
90
        transport._clear_protocol_handlers()
 
91
        transport.register_transport_proto('foo')
 
92
        transport.register_lazy_transport(
 
93
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
92
94
        try:
93
 
            transport.register_transport_proto('foo')
94
 
            transport.register_lazy_transport(
95
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
96
 
            try:
97
 
                transport.get_transport('foo://fooserver/foo')
98
 
            except errors.UnsupportedProtocol, e:
99
 
                e_str = str(e)
100
 
                self.assertEquals('Unsupported protocol'
101
 
                                  ' for url "foo://fooserver/foo":'
102
 
                                  ' Unable to import library "some_lib":'
103
 
                                  ' testing missing dependency', str(e))
104
 
            else:
105
 
                self.fail('Did not raise UnsupportedProtocol')
106
 
        finally:
107
 
            # restore original values
108
 
            transport._set_protocol_handlers(saved_handlers)
 
95
            transport.get_transport_from_url('foo://fooserver/foo')
 
96
        except errors.UnsupportedProtocol, e:
 
97
            e_str = str(e)
 
98
            self.assertEquals('Unsupported protocol'
 
99
                                ' for url "foo://fooserver/foo":'
 
100
                                ' Unable to import library "some_lib":'
 
101
                                ' testing missing dependency', str(e))
 
102
        else:
 
103
            self.fail('Did not raise UnsupportedProtocol')
109
104
 
110
105
    def test_transport_fallback(self):
111
106
        """Transport with missing dependency causes no error"""
112
107
        saved_handlers = transport._get_protocol_handlers()
113
 
        try:
114
 
            transport._clear_protocol_handlers()
115
 
            transport.register_transport_proto('foo')
116
 
            transport.register_lazy_transport(
117
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
118
 
            transport.register_lazy_transport(
119
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
120
 
            t = transport.get_transport('foo://fooserver/foo')
121
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
122
 
        finally:
123
 
            transport._set_protocol_handlers(saved_handlers)
 
108
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
109
        transport._clear_protocol_handlers()
 
110
        transport.register_transport_proto('foo')
 
111
        transport.register_lazy_transport(
 
112
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
113
        transport.register_lazy_transport(
 
114
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
115
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
116
        self.assertTrue(isinstance(t, BackupTransportHandler))
124
117
 
125
118
    def test_ssh_hints(self):
126
119
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
127
120
        try:
128
 
            transport.get_transport('ssh://fooserver/foo')
 
121
            transport.get_transport_from_url('ssh://fooserver/foo')
129
122
        except errors.UnsupportedProtocol, e:
130
123
            e_str = str(e)
131
124
            self.assertEquals('Unsupported protocol'
146
139
        self.assertRaises(errors.ReadError, a_file.read, 40)
147
140
        a_file.close()
148
141
 
149
 
    def test__combine_paths(self):
150
 
        t = transport.Transport('/')
151
 
        self.assertEqual('/home/sarah/project/foo',
152
 
                         t._combine_paths('/home/sarah', 'project/foo'))
153
 
        self.assertEqual('/etc',
154
 
                         t._combine_paths('/home/sarah', '../../etc'))
155
 
        self.assertEqual('/etc',
156
 
                         t._combine_paths('/home/sarah', '../../../etc'))
157
 
        self.assertEqual('/etc',
158
 
                         t._combine_paths('/home/sarah', '/etc'))
159
 
 
160
142
    def test_local_abspath_non_local_transport(self):
161
143
        # the base implementation should throw
162
144
        t = memory.MemoryTransport()
219
201
 
220
202
    def test_coalesce_fudge(self):
221
203
        self.check([(10, 30, [(0, 10), (20, 10)]),
222
 
                    (100, 10, [(0, 10),]),
 
204
                    (100, 10, [(0, 10)]),
223
205
                   ], [(10, 10), (30, 10), (100, 10)],
224
 
                   fudge=10
225
 
                  )
 
206
                   fudge=10)
 
207
 
226
208
    def test_coalesce_max_size(self):
227
209
        self.check([(10, 20, [(0, 10), (10, 10)]),
228
210
                    (30, 50, [(0, 50)]),
229
211
                    # If one range is above max_size, it gets its own coalesced
230
212
                    # offset
231
 
                    (100, 80, [(0, 80),]),],
 
213
                    (100, 80, [(0, 80)]),],
232
214
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
233
 
                   max_size=50
234
 
                  )
 
215
                   max_size=50)
235
216
 
236
217
    def test_coalesce_no_max_size(self):
237
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
218
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
238
219
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
239
220
                  )
240
221
 
241
222
    def test_coalesce_default_limit(self):
242
223
        # By default we use a 100MB max size.
243
 
        ten_mb = 10*1024*1024
244
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
224
        ten_mb = 10 * 1024 * 1024
 
225
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
245
226
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
246
227
                   [(i*ten_mb, ten_mb) for i in range(11)])
247
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
248
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
228
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
229
                   [(i * ten_mb, ten_mb) for i in range(11)],
249
230
                   max_size=1*1024*1024*1024)
250
231
 
251
232
 
256
237
        server.start_server()
257
238
        url = server.get_url()
258
239
        self.assertTrue(url in transport.transport_list_registry)
259
 
        t = transport.get_transport(url)
 
240
        t = transport.get_transport_from_url(url)
260
241
        del t
261
242
        server.stop_server()
262
243
        self.assertFalse(url in transport.transport_list_registry)
377
358
    def test_abspath(self):
378
359
        # The abspath is always relative to the chroot_url.
379
360
        server = chroot.ChrootServer(
380
 
            transport.get_transport('memory:///foo/bar/'))
 
361
            transport.get_transport_from_url('memory:///foo/bar/'))
381
362
        self.start_server(server)
382
 
        t = transport.get_transport(server.get_url())
 
363
        t = transport.get_transport_from_url(server.get_url())
383
364
        self.assertEqual(server.get_url(), t.abspath('/'))
384
365
 
385
366
        subdir_t = t.clone('subdir')
387
368
 
388
369
    def test_clone(self):
389
370
        server = chroot.ChrootServer(
390
 
            transport.get_transport('memory:///foo/bar/'))
 
371
            transport.get_transport_from_url('memory:///foo/bar/'))
391
372
        self.start_server(server)
392
 
        t = transport.get_transport(server.get_url())
 
373
        t = transport.get_transport_from_url(server.get_url())
393
374
        # relpath from root and root path are the same
394
375
        relpath_cloned = t.clone('foo')
395
376
        abspath_cloned = t.clone('/foo')
404
385
        This is so that it is not possible to escape a chroot by doing::
405
386
            url = chroot_transport.base
406
387
            parent_url = urlutils.join(url, '..')
407
 
            new_t = transport.get_transport(parent_url)
 
388
            new_t = transport.get_transport_from_url(parent_url)
408
389
        """
409
390
        server = chroot.ChrootServer(
410
 
            transport.get_transport('memory:///path/subpath'))
 
391
            transport.get_transport_from_url('memory:///path/subpath'))
411
392
        self.start_server(server)
412
 
        t = transport.get_transport(server.get_url())
413
 
        new_t = transport.get_transport(t.base)
 
393
        t = transport.get_transport_from_url(server.get_url())
 
394
        new_t = transport.get_transport_from_url(t.base)
414
395
        self.assertEqual(t.server, new_t.server)
415
396
        self.assertEqual(t.base, new_t.base)
416
397
 
421
402
        This is so that it is not possible to escape a chroot by doing::
422
403
            url = chroot_transport.base
423
404
            parent_url = urlutils.join(url, '..')
424
 
            new_t = transport.get_transport(parent_url)
 
405
            new_t = transport.get_transport_from_url(parent_url)
425
406
        """
426
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
407
        server = chroot.ChrootServer(
 
408
            transport.get_transport_from_url('memory:///path/'))
427
409
        self.start_server(server)
428
 
        t = transport.get_transport(server.get_url())
 
410
        t = transport.get_transport_from_url(server.get_url())
429
411
        self.assertRaises(
430
412
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
431
413
 
441
423
        backing_transport = memory.MemoryTransport()
442
424
        server = chroot.ChrootServer(backing_transport)
443
425
        server.start_server()
444
 
        try:
445
 
            self.assertTrue(server.scheme
446
 
                            in transport._get_protocol_handlers().keys())
447
 
        finally:
448
 
            server.stop_server()
 
426
        self.addCleanup(server.stop_server)
 
427
        self.assertTrue(server.scheme
 
428
                        in transport._get_protocol_handlers().keys())
449
429
 
450
430
    def test_stop_server(self):
451
431
        backing_transport = memory.MemoryTransport()
459
439
        backing_transport = memory.MemoryTransport()
460
440
        server = chroot.ChrootServer(backing_transport)
461
441
        server.start_server()
462
 
        try:
463
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
464
 
        finally:
465
 
            server.stop_server()
 
442
        self.addCleanup(server.stop_server)
 
443
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
466
444
 
467
445
 
468
446
class PathFilteringDecoratorTransportTest(tests.TestCase):
471
449
    def test_abspath(self):
472
450
        # The abspath is always relative to the base of the backing transport.
473
451
        server = pathfilter.PathFilteringServer(
474
 
            transport.get_transport('memory:///foo/bar/'),
 
452
            transport.get_transport_from_url('memory:///foo/bar/'),
475
453
            lambda x: x)
476
454
        server.start_server()
477
 
        t = transport.get_transport(server.get_url())
 
455
        t = transport.get_transport_from_url(server.get_url())
478
456
        self.assertEqual(server.get_url(), t.abspath('/'))
479
457
 
480
458
        subdir_t = t.clone('subdir')
483
461
 
484
462
    def make_pf_transport(self, filter_func=None):
485
463
        """Make a PathFilteringTransport backed by a MemoryTransport.
486
 
        
 
464
 
487
465
        :param filter_func: by default this will be a no-op function.  Use this
488
466
            parameter to override it."""
489
467
        if filter_func is None:
490
468
            filter_func = lambda x: x
491
469
        server = pathfilter.PathFilteringServer(
492
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
470
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
493
471
        server.start_server()
494
472
        self.addCleanup(server.stop_server)
495
 
        return transport.get_transport(server.get_url())
 
473
        return transport.get_transport_from_url(server.get_url())
496
474
 
497
475
    def test__filter(self):
498
476
        # _filter (with an identity func as filter_func) always returns
511
489
 
512
490
    def test_filter_invocation(self):
513
491
        filter_log = []
 
492
 
514
493
        def filter(path):
515
494
            filter_log.append(path)
516
495
            return path
541
520
        otherwise) the filtering by doing::
542
521
            url = filtered_transport.base
543
522
            parent_url = urlutils.join(url, '..')
544
 
            new_t = transport.get_transport(parent_url)
 
523
            new_t = transport.get_transport_from_url(parent_url)
545
524
        """
546
525
        t = self.make_pf_transport()
547
 
        new_t = transport.get_transport(t.base)
 
526
        new_t = transport.get_transport_from_url(t.base)
548
527
        self.assertEqual(t.server, new_t.server)
549
528
        self.assertEqual(t.base, new_t.base)
550
529
 
563
542
        # connect to '.' via http which is not listable
564
543
        server = HttpServer()
565
544
        self.start_server(server)
566
 
        t = transport.get_transport('readonly+' + server.get_url())
567
 
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
 
545
        t = transport.get_transport_from_url('readonly+' + server.get_url())
 
546
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
568
547
        self.assertEqual(False, t.listable())
569
548
        self.assertEqual(True, t.is_readonly())
570
549
 
602
581
        # the url should be decorated appropriately
603
582
        self.assertStartsWith(server.get_url(), 'fakenfs+')
604
583
        # and we should be able to get a transport for it
605
 
        t = transport.get_transport(server.get_url())
 
584
        t = transport.get_transport_from_url(server.get_url())
606
585
        # which must be a FakeNFSTransportDecorator instance.
607
586
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
608
587
 
685
664
        base_url = self._server.get_url()
686
665
        url = self._adjust_url(base_url, relpath)
687
666
        # try getting the transport via the regular interface:
688
 
        t = transport.get_transport(url)
 
667
        t = transport.get_transport_from_url(url)
689
668
        # vila--20070607 if the following are commented out the test suite
690
669
        # still pass. Is this really still needed or was it a forgotten
691
670
        # temporary fix ?
696
675
        return t
697
676
 
698
677
 
 
678
class TestTransportFromPath(tests.TestCaseInTempDir):
 
679
 
 
680
    def test_with_path(self):
 
681
        t = transport.get_transport_from_path(self.test_dir)
 
682
        self.assertIsInstance(t, local.LocalTransport)
 
683
        self.assertEquals(t.base.rstrip("/"),
 
684
            urlutils.local_path_to_url(self.test_dir))
 
685
 
 
686
    def test_with_url(self):
 
687
        t = transport.get_transport_from_path("file:")
 
688
        self.assertIsInstance(t, local.LocalTransport)
 
689
        self.assertEquals(t.base.rstrip("/"),
 
690
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
691
 
 
692
 
 
693
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
694
 
 
695
    def test_with_path(self):
 
696
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
697
            self.test_dir)
 
698
 
 
699
    def test_with_url(self):
 
700
        url = urlutils.local_path_to_url(self.test_dir)
 
701
        t = transport.get_transport_from_url(url)
 
702
        self.assertIsInstance(t, local.LocalTransport)
 
703
        self.assertEquals(t.base.rstrip("/"), url)
 
704
 
 
705
    def test_with_url_and_segment_parameters(self):
 
706
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
 
707
        t = transport.get_transport_from_url(url)
 
708
        self.assertIsInstance(t, local.LocalTransport)
 
709
        self.assertEquals(t.base.rstrip("/"), url)
 
710
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
 
711
            f.write("data")
 
712
        self.assertTrue(t.has("afile"))
 
713
 
 
714
 
699
715
class TestLocalTransports(tests.TestCase):
700
716
 
701
717
    def test_get_transport_from_abspath(self):
723
739
        self.assertEquals(t.local_abspath(''), here)
724
740
 
725
741
 
 
742
class TestLocalTransportMutation(tests.TestCaseInTempDir):
 
743
 
 
744
    def test_local_transport_mkdir(self):
 
745
        here = osutils.abspath('.')
 
746
        t = transport.get_transport(here)
 
747
        t.mkdir('test')
 
748
        self.assertTrue(os.path.exists('test'))
 
749
 
 
750
    def test_local_transport_mkdir_permission_denied(self):
 
751
        # See https://bugs.launchpad.net/bzr/+bug/606537
 
752
        here = osutils.abspath('.')
 
753
        t = transport.get_transport(here)
 
754
        def fake_chmod(path, mode):
 
755
            e = OSError('permission denied')
 
756
            e.errno = errno.EPERM
 
757
            raise e
 
758
        self.overrideAttr(os, 'chmod', fake_chmod)
 
759
        t.mkdir('test')
 
760
        t.mkdir('test2', mode=0707)
 
761
        self.assertTrue(os.path.exists('test'))
 
762
        self.assertTrue(os.path.exists('test2'))
 
763
 
 
764
 
 
765
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
 
766
 
 
767
    def test_local_fdatasync_calls_fdatasync(self):
 
768
        """Check fdatasync on a stream tries to flush the data to the OS.
 
769
        
 
770
        We can't easily observe the external effect but we can at least see
 
771
        it's called.
 
772
        """
 
773
        sentinel = object()
 
774
        fdatasync = getattr(os, 'fdatasync', sentinel)
 
775
        if fdatasync is sentinel:
 
776
            raise tests.TestNotApplicable('fdatasync not supported')
 
777
        t = self.get_transport('.')
 
778
        calls = self.recordCalls(os, 'fdatasync')
 
779
        w = t.open_write_stream('out')
 
780
        w.write('foo')
 
781
        w.fdatasync()
 
782
        with open('out', 'rb') as f:
 
783
            # Should have been flushed.
 
784
            self.assertEquals(f.read(), 'foo')
 
785
        self.assertEquals(len(calls), 1, calls)
 
786
 
 
787
    def test_missing_directory(self):
 
788
        t = self.get_transport('.')
 
789
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
 
790
 
 
791
 
726
792
class TestWin32LocalTransport(tests.TestCase):
727
793
 
728
794
    def test_unc_clone_to_root(self):
744
810
    def test_parse_url(self):
745
811
        t = transport.ConnectedTransport(
746
812
            'http://simple.example.com/home/source')
747
 
        self.assertEquals(t._host, 'simple.example.com')
748
 
        self.assertEquals(t._port, None)
749
 
        self.assertEquals(t._path, '/home/source/')
750
 
        self.failUnless(t._user is None)
751
 
        self.failUnless(t._password is None)
 
813
        self.assertEquals(t._parsed_url.host, 'simple.example.com')
 
814
        self.assertEquals(t._parsed_url.port, None)
 
815
        self.assertEquals(t._parsed_url.path, '/home/source/')
 
816
        self.assertTrue(t._parsed_url.user is None)
 
817
        self.assertTrue(t._parsed_url.password is None)
752
818
 
753
819
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
754
820
 
755
821
    def test_parse_url_with_at_in_user(self):
756
822
        # Bug 228058
757
823
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
758
 
        self.assertEquals(t._user, 'user@host.com')
 
824
        self.assertEquals(t._parsed_url.user, 'user@host.com')
759
825
 
760
826
    def test_parse_quoted_url(self):
761
827
        t = transport.ConnectedTransport(
762
828
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
763
 
        self.assertEquals(t._host, 'exAmple.com')
764
 
        self.assertEquals(t._port, 2222)
765
 
        self.assertEquals(t._user, 'robey')
766
 
        self.assertEquals(t._password, 'h@t')
767
 
        self.assertEquals(t._path, '/path/')
 
829
        self.assertEquals(t._parsed_url.host, 'exAmple.com')
 
830
        self.assertEquals(t._parsed_url.port, 2222)
 
831
        self.assertEquals(t._parsed_url.user, 'robey')
 
832
        self.assertEquals(t._parsed_url.password, 'h@t')
 
833
        self.assertEquals(t._parsed_url.path, '/path/')
768
834
 
769
835
        # Base should not keep track of the password
770
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
836
        self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
771
837
 
772
838
    def test_parse_invalid_url(self):
773
839
        self.assertRaises(errors.InvalidURL,
777
843
    def test_relpath(self):
778
844
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
779
845
 
780
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
846
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
 
847
            'sub')
781
848
        self.assertRaises(errors.PathNotChild, t.relpath,
782
849
                          'http://user@host.com/abs/path/sub')
783
850
        self.assertRaises(errors.PathNotChild, t.relpath,
796
863
 
797
864
    def test_connection_sharing_propagate_credentials(self):
798
865
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
799
 
        self.assertEquals('user', t._user)
800
 
        self.assertEquals('host.com', t._host)
 
866
        self.assertEquals('user', t._parsed_url.user)
 
867
        self.assertEquals('host.com', t._parsed_url.host)
801
868
        self.assertIs(None, t._get_connection())
802
 
        self.assertIs(None, t._password)
 
869
        self.assertIs(None, t._parsed_url.password)
803
870
        c = t.clone('subdir')
804
871
        self.assertIs(None, c._get_connection())
805
 
        self.assertIs(None, t._password)
 
872
        self.assertIs(None, t._parsed_url.password)
806
873
 
807
874
        # Simulate the user entering a password
808
875
        password = 'secret'
827
894
 
828
895
    def test_reuse_same_transport(self):
829
896
        possible_transports = []
830
 
        t1 = transport.get_transport('http://foo/',
 
897
        t1 = transport.get_transport_from_url('http://foo/',
831
898
                                     possible_transports=possible_transports)
832
899
        self.assertEqual([t1], possible_transports)
833
 
        t2 = transport.get_transport('http://foo/',
 
900
        t2 = transport.get_transport_from_url('http://foo/',
834
901
                                     possible_transports=[t1])
835
902
        self.assertIs(t1, t2)
836
903
 
837
904
        # Also check that final '/' are handled correctly
838
 
        t3 = transport.get_transport('http://foo/path/')
839
 
        t4 = transport.get_transport('http://foo/path',
 
905
        t3 = transport.get_transport_from_url('http://foo/path/')
 
906
        t4 = transport.get_transport_from_url('http://foo/path',
840
907
                                     possible_transports=[t3])
841
908
        self.assertIs(t3, t4)
842
909
 
843
 
        t5 = transport.get_transport('http://foo/path')
844
 
        t6 = transport.get_transport('http://foo/path/',
 
910
        t5 = transport.get_transport_from_url('http://foo/path')
 
911
        t6 = transport.get_transport_from_url('http://foo/path/',
845
912
                                     possible_transports=[t5])
846
913
        self.assertIs(t5, t6)
847
914
 
848
915
    def test_don_t_reuse_different_transport(self):
849
 
        t1 = transport.get_transport('http://foo/path')
850
 
        t2 = transport.get_transport('http://bar/path',
 
916
        t1 = transport.get_transport_from_url('http://foo/path')
 
917
        t2 = transport.get_transport_from_url('http://bar/path',
851
918
                                     possible_transports=[t1])
852
919
        self.assertIsNot(t1, t2)
853
920
 
854
921
 
855
922
class TestTransportTrace(tests.TestCase):
856
923
 
857
 
    def test_get(self):
858
 
        t = transport.get_transport('trace+memory://')
859
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
924
    def test_decorator(self):
 
925
        t = transport.get_transport_from_url('trace+memory://')
 
926
        self.assertIsInstance(
 
927
            t, bzrlib.transport.trace.TransportTraceDecorator)
860
928
 
861
929
    def test_clone_preserves_activity(self):
862
 
        t = transport.get_transport('trace+memory://')
 
930
        t = transport.get_transport_from_url('trace+memory://')
863
931
        t2 = t.clone('.')
864
932
        self.assertTrue(t is not t2)
865
933
        self.assertTrue(t._activity is t2._activity)
869
937
    # still won't cause a test failure when the top level Transport API
870
938
    # changes; so there is little return doing that.
871
939
    def test_get(self):
872
 
        t = transport.get_transport('trace+memory:///')
 
940
        t = transport.get_transport_from_url('trace+memory:///')
873
941
        t.put_bytes('foo', 'barish')
874
942
        t.get('foo')
875
943
        expected_result = []
881
949
        self.assertEqual(expected_result, t._activity)
882
950
 
883
951
    def test_readv(self):
884
 
        t = transport.get_transport('trace+memory:///')
 
952
        t = transport.get_transport_from_url('trace+memory:///')
885
953
        t.put_bytes('foo', 'barish')
886
954
        list(t.readv('foo', [(0, 1), (3, 2)],
887
955
                     adjust_for_latency=True, upper_limit=6))
897
965
class TestSSHConnections(tests.TestCaseWithTransport):
898
966
 
899
967
    def test_bzr_connect_to_bzr_ssh(self):
900
 
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
968
        """get_transport of a bzr+ssh:// behaves correctly.
901
969
 
902
970
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
903
971
        """
919
987
        # SSH channel ourselves.  Surely this has already been implemented
920
988
        # elsewhere?
921
989
        started = []
 
990
 
922
991
        class StubSSHServer(stub_sftp.StubServer):
923
992
 
924
993
            test = self
930
999
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
931
1000
 
932
1001
                # XXX: horribly inefficient, not to mention ugly.
933
 
                # Start a thread for each of stdin/out/err, and relay bytes from
934
 
                # the subprocess to channel and vice versa.
 
1002
                # Start a thread for each of stdin/out/err, and relay bytes
 
1003
                # from the subprocess to channel and vice versa.
935
1004
                def ferry_bytes(read, write, close):
936
1005
                    while True:
937
1006
                        bytes = read(1)
967
1036
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
968
1037
        else:
969
1038
            bzr_remote_path = self.get_bzr_path()
970
 
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
 
1039
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
971
1040
 
972
1041
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
973
1042
        # variable is used to tell bzr what command to run on the remote end.
1005
1074
        result = http.unhtml_roughly(fake_html)
1006
1075
        self.assertEquals(len(result), 1000)
1007
1076
        self.assertStartsWith(result, " something!")
 
1077
 
 
1078
 
 
1079
class SomeDirectory(object):
 
1080
 
 
1081
    def look_up(self, name, url):
 
1082
        return "http://bar"
 
1083
 
 
1084
 
 
1085
class TestLocationToUrl(tests.TestCase):
 
1086
 
 
1087
    def get_base_location(self):
 
1088
        path = osutils.abspath('/foo/bar')
 
1089
        if path.startswith('/'):
 
1090
            url = 'file://%s' % (path,)
 
1091
        else:
 
1092
            # On Windows, abspaths start with the drive letter, so we have to
 
1093
            # add in the extra '/'
 
1094
            url = 'file:///%s' % (path,)
 
1095
        return path, url
 
1096
 
 
1097
    def test_regular_url(self):
 
1098
        self.assertEquals("file://foo", location_to_url("file://foo"))
 
1099
 
 
1100
    def test_directory(self):
 
1101
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1102
        self.addCleanup(directories.remove, "bar:")
 
1103
        self.assertEquals("http://bar", location_to_url("bar:"))
 
1104
 
 
1105
    def test_unicode_url(self):
 
1106
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1107
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1108
 
 
1109
    def test_unicode_path(self):
 
1110
        path, url = self.get_base_location()
 
1111
        location = path + "\xc3\xaf".decode("utf-8")
 
1112
        url += '%C3%AF'
 
1113
        self.assertEquals(url, location_to_url(location))
 
1114
 
 
1115
    def test_path(self):
 
1116
        path, url = self.get_base_location()
 
1117
        self.assertEquals(url, location_to_url(path))
 
1118
 
 
1119
    def test_relative_file_url(self):
 
1120
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
 
1121
            location_to_url("file:bar"))
 
1122
 
 
1123
    def test_absolute_file_url(self):
 
1124
        self.assertEquals("file:///bar", location_to_url("file:/bar"))