~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Vincent Ladeuil
  • Date: 2011-12-21 14:25:26 UTC
  • mto: This revision was merged to the branch mainline in revision 6397.
  • Revision ID: v.ladeuil+lp@free.fr-20111221142526-pnwau0xnalimujts
Provides MemoryStack to simplify configuration setup in tests

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
 
18
18
from cStringIO import StringIO
 
19
import errno
19
20
import os
20
21
import subprocess
21
22
import sys
28
29
    transport,
29
30
    urlutils,
30
31
    )
 
32
from bzrlib.directory_service import directories
31
33
from bzrlib.transport import (
32
34
    chroot,
33
35
    fakenfs,
 
36
    http,
34
37
    local,
 
38
    location_to_url,
35
39
    memory,
36
40
    pathfilter,
37
41
    readonly,
38
42
    )
 
43
import bzrlib.transport.trace
39
44
from bzrlib.tests import (
40
45
    features,
41
46
    test_server,
48
53
class TestTransport(tests.TestCase):
49
54
    """Test the non transport-concrete class functionality."""
50
55
 
51
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
52
 
    # of try/finally -- vila 20100205
53
 
 
54
56
    def test__get_set_protocol_handlers(self):
55
57
        handlers = transport._get_protocol_handlers()
56
 
        self.assertNotEqual([], handlers.keys( ))
57
 
        try:
58
 
            transport._clear_protocol_handlers()
59
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
60
 
        finally:
61
 
            transport._set_protocol_handlers(handlers)
 
58
        self.assertNotEqual([], handlers.keys())
 
59
        transport._clear_protocol_handlers()
 
60
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
61
        self.assertEqual([], transport._get_protocol_handlers().keys())
62
62
 
63
63
    def test_get_transport_modules(self):
64
64
        handlers = transport._get_protocol_handlers()
 
65
        self.addCleanup(transport._set_protocol_handlers, handlers)
65
66
        # don't pollute the current handlers
66
67
        transport._clear_protocol_handlers()
 
68
 
67
69
        class SampleHandler(object):
68
70
            """I exist, isnt that enough?"""
69
 
        try:
70
 
            transport._clear_protocol_handlers()
71
 
            transport.register_transport_proto('foo')
72
 
            transport.register_lazy_transport('foo',
73
 
                                              'bzrlib.tests.test_transport',
74
 
                                              'TestTransport.SampleHandler')
75
 
            transport.register_transport_proto('bar')
76
 
            transport.register_lazy_transport('bar',
77
 
                                              'bzrlib.tests.test_transport',
78
 
                                              'TestTransport.SampleHandler')
79
 
            self.assertEqual([SampleHandler.__module__,
80
 
                              'bzrlib.transport.chroot',
81
 
                              'bzrlib.transport.pathfilter'],
82
 
                             transport._get_transport_modules())
83
 
        finally:
84
 
            transport._set_protocol_handlers(handlers)
 
71
        transport._clear_protocol_handlers()
 
72
        transport.register_transport_proto('foo')
 
73
        transport.register_lazy_transport('foo',
 
74
                                            'bzrlib.tests.test_transport',
 
75
                                            'TestTransport.SampleHandler')
 
76
        transport.register_transport_proto('bar')
 
77
        transport.register_lazy_transport('bar',
 
78
                                            'bzrlib.tests.test_transport',
 
79
                                            'TestTransport.SampleHandler')
 
80
        self.assertEqual([SampleHandler.__module__,
 
81
                            'bzrlib.transport.chroot',
 
82
                            'bzrlib.transport.pathfilter'],
 
83
                            transport._get_transport_modules())
85
84
 
86
85
    def test_transport_dependency(self):
87
86
        """Transport with missing dependency causes no error"""
88
87
        saved_handlers = transport._get_protocol_handlers()
 
88
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
89
89
        # don't pollute the current handlers
90
90
        transport._clear_protocol_handlers()
 
91
        transport.register_transport_proto('foo')
 
92
        transport.register_lazy_transport(
 
93
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
91
94
        try:
92
 
            transport.register_transport_proto('foo')
93
 
            transport.register_lazy_transport(
94
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
95
 
            try:
96
 
                transport.get_transport('foo://fooserver/foo')
97
 
            except errors.UnsupportedProtocol, e:
98
 
                e_str = str(e)
99
 
                self.assertEquals('Unsupported protocol'
100
 
                                  ' for url "foo://fooserver/foo":'
101
 
                                  ' Unable to import library "some_lib":'
102
 
                                  ' testing missing dependency', str(e))
103
 
            else:
104
 
                self.fail('Did not raise UnsupportedProtocol')
105
 
        finally:
106
 
            # restore original values
107
 
            transport._set_protocol_handlers(saved_handlers)
 
95
            transport.get_transport_from_url('foo://fooserver/foo')
 
96
        except errors.UnsupportedProtocol, e:
 
97
            e_str = str(e)
 
98
            self.assertEquals('Unsupported protocol'
 
99
                                ' for url "foo://fooserver/foo":'
 
100
                                ' Unable to import library "some_lib":'
 
101
                                ' testing missing dependency', str(e))
 
102
        else:
 
103
            self.fail('Did not raise UnsupportedProtocol')
108
104
 
109
105
    def test_transport_fallback(self):
110
106
        """Transport with missing dependency causes no error"""
111
107
        saved_handlers = transport._get_protocol_handlers()
112
 
        try:
113
 
            transport._clear_protocol_handlers()
114
 
            transport.register_transport_proto('foo')
115
 
            transport.register_lazy_transport(
116
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
117
 
            transport.register_lazy_transport(
118
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
119
 
            t = transport.get_transport('foo://fooserver/foo')
120
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
121
 
        finally:
122
 
            transport._set_protocol_handlers(saved_handlers)
 
108
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
109
        transport._clear_protocol_handlers()
 
110
        transport.register_transport_proto('foo')
 
111
        transport.register_lazy_transport(
 
112
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
113
        transport.register_lazy_transport(
 
114
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
115
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
116
        self.assertTrue(isinstance(t, BackupTransportHandler))
123
117
 
124
118
    def test_ssh_hints(self):
125
119
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
126
120
        try:
127
 
            transport.get_transport('ssh://fooserver/foo')
 
121
            transport.get_transport_from_url('ssh://fooserver/foo')
128
122
        except errors.UnsupportedProtocol, e:
129
123
            e_str = str(e)
130
124
            self.assertEquals('Unsupported protocol'
145
139
        self.assertRaises(errors.ReadError, a_file.read, 40)
146
140
        a_file.close()
147
141
 
148
 
    def test__combine_paths(self):
149
 
        t = transport.Transport('/')
150
 
        self.assertEqual('/home/sarah/project/foo',
151
 
                         t._combine_paths('/home/sarah', 'project/foo'))
152
 
        self.assertEqual('/etc',
153
 
                         t._combine_paths('/home/sarah', '../../etc'))
154
 
        self.assertEqual('/etc',
155
 
                         t._combine_paths('/home/sarah', '../../../etc'))
156
 
        self.assertEqual('/etc',
157
 
                         t._combine_paths('/home/sarah', '/etc'))
158
 
 
159
142
    def test_local_abspath_non_local_transport(self):
160
143
        # the base implementation should throw
161
144
        t = memory.MemoryTransport()
218
201
 
219
202
    def test_coalesce_fudge(self):
220
203
        self.check([(10, 30, [(0, 10), (20, 10)]),
221
 
                    (100, 10, [(0, 10),]),
 
204
                    (100, 10, [(0, 10)]),
222
205
                   ], [(10, 10), (30, 10), (100, 10)],
223
 
                   fudge=10
224
 
                  )
 
206
                   fudge=10)
 
207
 
225
208
    def test_coalesce_max_size(self):
226
209
        self.check([(10, 20, [(0, 10), (10, 10)]),
227
210
                    (30, 50, [(0, 50)]),
228
211
                    # If one range is above max_size, it gets its own coalesced
229
212
                    # offset
230
 
                    (100, 80, [(0, 80),]),],
 
213
                    (100, 80, [(0, 80)]),],
231
214
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
232
 
                   max_size=50
233
 
                  )
 
215
                   max_size=50)
234
216
 
235
217
    def test_coalesce_no_max_size(self):
236
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
218
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
237
219
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
238
220
                  )
239
221
 
240
222
    def test_coalesce_default_limit(self):
241
223
        # By default we use a 100MB max size.
242
 
        ten_mb = 10*1024*1024
243
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
224
        ten_mb = 10 * 1024 * 1024
 
225
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
244
226
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
245
227
                   [(i*ten_mb, ten_mb) for i in range(11)])
246
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
247
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
228
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
229
                   [(i * ten_mb, ten_mb) for i in range(11)],
248
230
                   max_size=1*1024*1024*1024)
249
231
 
250
232
 
255
237
        server.start_server()
256
238
        url = server.get_url()
257
239
        self.assertTrue(url in transport.transport_list_registry)
258
 
        t = transport.get_transport(url)
 
240
        t = transport.get_transport_from_url(url)
259
241
        del t
260
242
        server.stop_server()
261
243
        self.assertFalse(url in transport.transport_list_registry)
376
358
    def test_abspath(self):
377
359
        # The abspath is always relative to the chroot_url.
378
360
        server = chroot.ChrootServer(
379
 
            transport.get_transport('memory:///foo/bar/'))
 
361
            transport.get_transport_from_url('memory:///foo/bar/'))
380
362
        self.start_server(server)
381
 
        t = transport.get_transport(server.get_url())
 
363
        t = transport.get_transport_from_url(server.get_url())
382
364
        self.assertEqual(server.get_url(), t.abspath('/'))
383
365
 
384
366
        subdir_t = t.clone('subdir')
386
368
 
387
369
    def test_clone(self):
388
370
        server = chroot.ChrootServer(
389
 
            transport.get_transport('memory:///foo/bar/'))
 
371
            transport.get_transport_from_url('memory:///foo/bar/'))
390
372
        self.start_server(server)
391
 
        t = transport.get_transport(server.get_url())
 
373
        t = transport.get_transport_from_url(server.get_url())
392
374
        # relpath from root and root path are the same
393
375
        relpath_cloned = t.clone('foo')
394
376
        abspath_cloned = t.clone('/foo')
403
385
        This is so that it is not possible to escape a chroot by doing::
404
386
            url = chroot_transport.base
405
387
            parent_url = urlutils.join(url, '..')
406
 
            new_t = transport.get_transport(parent_url)
 
388
            new_t = transport.get_transport_from_url(parent_url)
407
389
        """
408
390
        server = chroot.ChrootServer(
409
 
            transport.get_transport('memory:///path/subpath'))
 
391
            transport.get_transport_from_url('memory:///path/subpath'))
410
392
        self.start_server(server)
411
 
        t = transport.get_transport(server.get_url())
412
 
        new_t = transport.get_transport(t.base)
 
393
        t = transport.get_transport_from_url(server.get_url())
 
394
        new_t = transport.get_transport_from_url(t.base)
413
395
        self.assertEqual(t.server, new_t.server)
414
396
        self.assertEqual(t.base, new_t.base)
415
397
 
420
402
        This is so that it is not possible to escape a chroot by doing::
421
403
            url = chroot_transport.base
422
404
            parent_url = urlutils.join(url, '..')
423
 
            new_t = transport.get_transport(parent_url)
 
405
            new_t = transport.get_transport_from_url(parent_url)
424
406
        """
425
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
407
        server = chroot.ChrootServer(
 
408
            transport.get_transport_from_url('memory:///path/'))
426
409
        self.start_server(server)
427
 
        t = transport.get_transport(server.get_url())
 
410
        t = transport.get_transport_from_url(server.get_url())
428
411
        self.assertRaises(
429
412
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
430
413
 
440
423
        backing_transport = memory.MemoryTransport()
441
424
        server = chroot.ChrootServer(backing_transport)
442
425
        server.start_server()
443
 
        try:
444
 
            self.assertTrue(server.scheme
445
 
                            in transport._get_protocol_handlers().keys())
446
 
        finally:
447
 
            server.stop_server()
 
426
        self.addCleanup(server.stop_server)
 
427
        self.assertTrue(server.scheme
 
428
                        in transport._get_protocol_handlers().keys())
448
429
 
449
430
    def test_stop_server(self):
450
431
        backing_transport = memory.MemoryTransport()
458
439
        backing_transport = memory.MemoryTransport()
459
440
        server = chroot.ChrootServer(backing_transport)
460
441
        server.start_server()
461
 
        try:
462
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
463
 
        finally:
464
 
            server.stop_server()
 
442
        self.addCleanup(server.stop_server)
 
443
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
465
444
 
466
445
 
467
446
class PathFilteringDecoratorTransportTest(tests.TestCase):
470
449
    def test_abspath(self):
471
450
        # The abspath is always relative to the base of the backing transport.
472
451
        server = pathfilter.PathFilteringServer(
473
 
            transport.get_transport('memory:///foo/bar/'),
 
452
            transport.get_transport_from_url('memory:///foo/bar/'),
474
453
            lambda x: x)
475
454
        server.start_server()
476
 
        t = transport.get_transport(server.get_url())
 
455
        t = transport.get_transport_from_url(server.get_url())
477
456
        self.assertEqual(server.get_url(), t.abspath('/'))
478
457
 
479
458
        subdir_t = t.clone('subdir')
482
461
 
483
462
    def make_pf_transport(self, filter_func=None):
484
463
        """Make a PathFilteringTransport backed by a MemoryTransport.
485
 
        
 
464
 
486
465
        :param filter_func: by default this will be a no-op function.  Use this
487
466
            parameter to override it."""
488
467
        if filter_func is None:
489
468
            filter_func = lambda x: x
490
469
        server = pathfilter.PathFilteringServer(
491
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
470
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
492
471
        server.start_server()
493
472
        self.addCleanup(server.stop_server)
494
 
        return transport.get_transport(server.get_url())
 
473
        return transport.get_transport_from_url(server.get_url())
495
474
 
496
475
    def test__filter(self):
497
476
        # _filter (with an identity func as filter_func) always returns
510
489
 
511
490
    def test_filter_invocation(self):
512
491
        filter_log = []
 
492
 
513
493
        def filter(path):
514
494
            filter_log.append(path)
515
495
            return path
540
520
        otherwise) the filtering by doing::
541
521
            url = filtered_transport.base
542
522
            parent_url = urlutils.join(url, '..')
543
 
            new_t = transport.get_transport(parent_url)
 
523
            new_t = transport.get_transport_from_url(parent_url)
544
524
        """
545
525
        t = self.make_pf_transport()
546
 
        new_t = transport.get_transport(t.base)
 
526
        new_t = transport.get_transport_from_url(t.base)
547
527
        self.assertEqual(t.server, new_t.server)
548
528
        self.assertEqual(t.base, new_t.base)
549
529
 
562
542
        # connect to '.' via http which is not listable
563
543
        server = HttpServer()
564
544
        self.start_server(server)
565
 
        t = transport.get_transport('readonly+' + server.get_url())
566
 
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
 
545
        t = transport.get_transport_from_url('readonly+' + server.get_url())
 
546
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
567
547
        self.assertEqual(False, t.listable())
568
548
        self.assertEqual(True, t.is_readonly())
569
549
 
601
581
        # the url should be decorated appropriately
602
582
        self.assertStartsWith(server.get_url(), 'fakenfs+')
603
583
        # and we should be able to get a transport for it
604
 
        t = transport.get_transport(server.get_url())
 
584
        t = transport.get_transport_from_url(server.get_url())
605
585
        # which must be a FakeNFSTransportDecorator instance.
606
586
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
607
587
 
684
664
        base_url = self._server.get_url()
685
665
        url = self._adjust_url(base_url, relpath)
686
666
        # try getting the transport via the regular interface:
687
 
        t = transport.get_transport(url)
 
667
        t = transport.get_transport_from_url(url)
688
668
        # vila--20070607 if the following are commented out the test suite
689
669
        # still pass. Is this really still needed or was it a forgotten
690
670
        # temporary fix ?
695
675
        return t
696
676
 
697
677
 
 
678
class TestTransportFromPath(tests.TestCaseInTempDir):
 
679
 
 
680
    def test_with_path(self):
 
681
        t = transport.get_transport_from_path(self.test_dir)
 
682
        self.assertIsInstance(t, local.LocalTransport)
 
683
        self.assertEquals(t.base.rstrip("/"),
 
684
            urlutils.local_path_to_url(self.test_dir))
 
685
 
 
686
    def test_with_url(self):
 
687
        t = transport.get_transport_from_path("file:")
 
688
        self.assertIsInstance(t, local.LocalTransport)
 
689
        self.assertEquals(t.base.rstrip("/"),
 
690
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
691
 
 
692
 
 
693
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
694
 
 
695
    def test_with_path(self):
 
696
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
697
            self.test_dir)
 
698
 
 
699
    def test_with_url(self):
 
700
        url = urlutils.local_path_to_url(self.test_dir)
 
701
        t = transport.get_transport_from_url(url)
 
702
        self.assertIsInstance(t, local.LocalTransport)
 
703
        self.assertEquals(t.base.rstrip("/"), url)
 
704
 
 
705
    def test_with_url_and_segment_parameters(self):
 
706
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
 
707
        t = transport.get_transport_from_url(url)
 
708
        self.assertIsInstance(t, local.LocalTransport)
 
709
        self.assertEquals(t.base.rstrip("/"), url)
 
710
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
 
711
            f.write("data")
 
712
        self.assertTrue(t.has("afile"))
 
713
 
 
714
 
698
715
class TestLocalTransports(tests.TestCase):
699
716
 
700
717
    def test_get_transport_from_abspath(self):
722
739
        self.assertEquals(t.local_abspath(''), here)
723
740
 
724
741
 
 
742
class TestLocalTransportMutation(tests.TestCaseInTempDir):
 
743
 
 
744
    def test_local_transport_mkdir(self):
 
745
        here = osutils.abspath('.')
 
746
        t = transport.get_transport(here)
 
747
        t.mkdir('test')
 
748
        self.assertTrue(os.path.exists('test'))
 
749
 
 
750
    def test_local_transport_mkdir_permission_denied(self):
 
751
        # See https://bugs.launchpad.net/bzr/+bug/606537
 
752
        here = osutils.abspath('.')
 
753
        t = transport.get_transport(here)
 
754
        def fake_chmod(path, mode):
 
755
            e = OSError('permission denied')
 
756
            e.errno = errno.EPERM
 
757
            raise e
 
758
        self.overrideAttr(os, 'chmod', fake_chmod)
 
759
        t.mkdir('test')
 
760
        t.mkdir('test2', mode=0707)
 
761
        self.assertTrue(os.path.exists('test'))
 
762
        self.assertTrue(os.path.exists('test2'))
 
763
 
 
764
 
 
765
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
 
766
 
 
767
    def test_local_fdatasync_calls_fdatasync(self):
 
768
        """Check fdatasync on a stream tries to flush the data to the OS.
 
769
        
 
770
        We can't easily observe the external effect but we can at least see
 
771
        it's called.
 
772
        """
 
773
        sentinel = object()
 
774
        fdatasync = getattr(os, 'fdatasync', sentinel)
 
775
        if fdatasync is sentinel:
 
776
            raise tests.TestNotApplicable('fdatasync not supported')
 
777
        t = self.get_transport('.')
 
778
        calls = self.recordCalls(os, 'fdatasync')
 
779
        w = t.open_write_stream('out')
 
780
        w.write('foo')
 
781
        w.fdatasync()
 
782
        with open('out', 'rb') as f:
 
783
            # Should have been flushed.
 
784
            self.assertEquals(f.read(), 'foo')
 
785
        self.assertEquals(len(calls), 1, calls)
 
786
 
 
787
    def test_missing_directory(self):
 
788
        t = self.get_transport('.')
 
789
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
 
790
 
 
791
 
725
792
class TestWin32LocalTransport(tests.TestCase):
726
793
 
727
794
    def test_unc_clone_to_root(self):
743
810
    def test_parse_url(self):
744
811
        t = transport.ConnectedTransport(
745
812
            'http://simple.example.com/home/source')
746
 
        self.assertEquals(t._host, 'simple.example.com')
747
 
        self.assertEquals(t._port, None)
748
 
        self.assertEquals(t._path, '/home/source/')
749
 
        self.failUnless(t._user is None)
750
 
        self.failUnless(t._password is None)
 
813
        self.assertEquals(t._parsed_url.host, 'simple.example.com')
 
814
        self.assertEquals(t._parsed_url.port, None)
 
815
        self.assertEquals(t._parsed_url.path, '/home/source/')
 
816
        self.assertTrue(t._parsed_url.user is None)
 
817
        self.assertTrue(t._parsed_url.password is None)
751
818
 
752
819
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
753
820
 
754
821
    def test_parse_url_with_at_in_user(self):
755
822
        # Bug 228058
756
823
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
757
 
        self.assertEquals(t._user, 'user@host.com')
 
824
        self.assertEquals(t._parsed_url.user, 'user@host.com')
758
825
 
759
826
    def test_parse_quoted_url(self):
760
827
        t = transport.ConnectedTransport(
761
828
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
762
 
        self.assertEquals(t._host, 'exAmple.com')
763
 
        self.assertEquals(t._port, 2222)
764
 
        self.assertEquals(t._user, 'robey')
765
 
        self.assertEquals(t._password, 'h@t')
766
 
        self.assertEquals(t._path, '/path/')
 
829
        self.assertEquals(t._parsed_url.host, 'exAmple.com')
 
830
        self.assertEquals(t._parsed_url.port, 2222)
 
831
        self.assertEquals(t._parsed_url.user, 'robey')
 
832
        self.assertEquals(t._parsed_url.password, 'h@t')
 
833
        self.assertEquals(t._parsed_url.path, '/path/')
767
834
 
768
835
        # Base should not keep track of the password
769
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
836
        self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
770
837
 
771
838
    def test_parse_invalid_url(self):
772
839
        self.assertRaises(errors.InvalidURL,
776
843
    def test_relpath(self):
777
844
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
778
845
 
779
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
846
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
 
847
            'sub')
780
848
        self.assertRaises(errors.PathNotChild, t.relpath,
781
849
                          'http://user@host.com/abs/path/sub')
782
850
        self.assertRaises(errors.PathNotChild, t.relpath,
795
863
 
796
864
    def test_connection_sharing_propagate_credentials(self):
797
865
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
798
 
        self.assertEquals('user', t._user)
799
 
        self.assertEquals('host.com', t._host)
 
866
        self.assertEquals('user', t._parsed_url.user)
 
867
        self.assertEquals('host.com', t._parsed_url.host)
800
868
        self.assertIs(None, t._get_connection())
801
 
        self.assertIs(None, t._password)
 
869
        self.assertIs(None, t._parsed_url.password)
802
870
        c = t.clone('subdir')
803
871
        self.assertIs(None, c._get_connection())
804
 
        self.assertIs(None, t._password)
 
872
        self.assertIs(None, t._parsed_url.password)
805
873
 
806
874
        # Simulate the user entering a password
807
875
        password = 'secret'
826
894
 
827
895
    def test_reuse_same_transport(self):
828
896
        possible_transports = []
829
 
        t1 = transport.get_transport('http://foo/',
 
897
        t1 = transport.get_transport_from_url('http://foo/',
830
898
                                     possible_transports=possible_transports)
831
899
        self.assertEqual([t1], possible_transports)
832
 
        t2 = transport.get_transport('http://foo/',
 
900
        t2 = transport.get_transport_from_url('http://foo/',
833
901
                                     possible_transports=[t1])
834
902
        self.assertIs(t1, t2)
835
903
 
836
904
        # Also check that final '/' are handled correctly
837
 
        t3 = transport.get_transport('http://foo/path/')
838
 
        t4 = transport.get_transport('http://foo/path',
 
905
        t3 = transport.get_transport_from_url('http://foo/path/')
 
906
        t4 = transport.get_transport_from_url('http://foo/path',
839
907
                                     possible_transports=[t3])
840
908
        self.assertIs(t3, t4)
841
909
 
842
 
        t5 = transport.get_transport('http://foo/path')
843
 
        t6 = transport.get_transport('http://foo/path/',
 
910
        t5 = transport.get_transport_from_url('http://foo/path')
 
911
        t6 = transport.get_transport_from_url('http://foo/path/',
844
912
                                     possible_transports=[t5])
845
913
        self.assertIs(t5, t6)
846
914
 
847
915
    def test_don_t_reuse_different_transport(self):
848
 
        t1 = transport.get_transport('http://foo/path')
849
 
        t2 = transport.get_transport('http://bar/path',
 
916
        t1 = transport.get_transport_from_url('http://foo/path')
 
917
        t2 = transport.get_transport_from_url('http://bar/path',
850
918
                                     possible_transports=[t1])
851
919
        self.assertIsNot(t1, t2)
852
920
 
853
921
 
854
922
class TestTransportTrace(tests.TestCase):
855
923
 
856
 
    def test_get(self):
857
 
        t = transport.get_transport('trace+memory://')
858
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
924
    def test_decorator(self):
 
925
        t = transport.get_transport_from_url('trace+memory://')
 
926
        self.assertIsInstance(
 
927
            t, bzrlib.transport.trace.TransportTraceDecorator)
859
928
 
860
929
    def test_clone_preserves_activity(self):
861
 
        t = transport.get_transport('trace+memory://')
 
930
        t = transport.get_transport_from_url('trace+memory://')
862
931
        t2 = t.clone('.')
863
932
        self.assertTrue(t is not t2)
864
933
        self.assertTrue(t._activity is t2._activity)
868
937
    # still won't cause a test failure when the top level Transport API
869
938
    # changes; so there is little return doing that.
870
939
    def test_get(self):
871
 
        t = transport.get_transport('trace+memory:///')
 
940
        t = transport.get_transport_from_url('trace+memory:///')
872
941
        t.put_bytes('foo', 'barish')
873
942
        t.get('foo')
874
943
        expected_result = []
880
949
        self.assertEqual(expected_result, t._activity)
881
950
 
882
951
    def test_readv(self):
883
 
        t = transport.get_transport('trace+memory:///')
 
952
        t = transport.get_transport_from_url('trace+memory:///')
884
953
        t.put_bytes('foo', 'barish')
885
954
        list(t.readv('foo', [(0, 1), (3, 2)],
886
955
                     adjust_for_latency=True, upper_limit=6))
896
965
class TestSSHConnections(tests.TestCaseWithTransport):
897
966
 
898
967
    def test_bzr_connect_to_bzr_ssh(self):
899
 
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
968
        """get_transport of a bzr+ssh:// behaves correctly.
900
969
 
901
970
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
902
971
        """
918
987
        # SSH channel ourselves.  Surely this has already been implemented
919
988
        # elsewhere?
920
989
        started = []
 
990
 
921
991
        class StubSSHServer(stub_sftp.StubServer):
922
992
 
923
993
            test = self
929
999
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
930
1000
 
931
1001
                # XXX: horribly inefficient, not to mention ugly.
932
 
                # Start a thread for each of stdin/out/err, and relay bytes from
933
 
                # the subprocess to channel and vice versa.
 
1002
                # Start a thread for each of stdin/out/err, and relay bytes
 
1003
                # from the subprocess to channel and vice versa.
934
1004
                def ferry_bytes(read, write, close):
935
1005
                    while True:
936
1006
                        bytes = read(1)
955
1025
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
956
1026
        # We *don't* want to override the default SSH vendor: the detected one
957
1027
        # is the one to use.
 
1028
 
 
1029
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
 
1030
        # inherits from SFTPServer which forces the SSH vendor to
 
1031
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
958
1032
        self.start_server(ssh_server)
959
 
        port = ssh_server._listener.port
 
1033
        port = ssh_server.port
960
1034
 
961
1035
        if sys.platform == 'win32':
962
1036
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
963
1037
        else:
964
1038
            bzr_remote_path = self.get_bzr_path()
965
 
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
 
1039
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
966
1040
 
967
1041
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
968
1042
        # variable is used to tell bzr what command to run on the remote end.
989
1063
        # And the rest are threads
990
1064
        for t in started[1:]:
991
1065
            t.join()
 
1066
 
 
1067
 
 
1068
class TestUnhtml(tests.TestCase):
 
1069
 
 
1070
    """Tests for unhtml_roughly"""
 
1071
 
 
1072
    def test_truncation(self):
 
1073
        fake_html = "<p>something!\n" * 1000
 
1074
        result = http.unhtml_roughly(fake_html)
 
1075
        self.assertEquals(len(result), 1000)
 
1076
        self.assertStartsWith(result, " something!")
 
1077
 
 
1078
 
 
1079
class SomeDirectory(object):
 
1080
 
 
1081
    def look_up(self, name, url):
 
1082
        return "http://bar"
 
1083
 
 
1084
 
 
1085
class TestLocationToUrl(tests.TestCase):
 
1086
 
 
1087
    def get_base_location(self):
 
1088
        path = osutils.abspath('/foo/bar')
 
1089
        if path.startswith('/'):
 
1090
            url = 'file://%s' % (path,)
 
1091
        else:
 
1092
            # On Windows, abspaths start with the drive letter, so we have to
 
1093
            # add in the extra '/'
 
1094
            url = 'file:///%s' % (path,)
 
1095
        return path, url
 
1096
 
 
1097
    def test_regular_url(self):
 
1098
        self.assertEquals("file://foo", location_to_url("file://foo"))
 
1099
 
 
1100
    def test_directory(self):
 
1101
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1102
        self.addCleanup(directories.remove, "bar:")
 
1103
        self.assertEquals("http://bar", location_to_url("bar:"))
 
1104
 
 
1105
    def test_unicode_url(self):
 
1106
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1107
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1108
 
 
1109
    def test_unicode_path(self):
 
1110
        path, url = self.get_base_location()
 
1111
        location = path + "\xc3\xaf".decode("utf-8")
 
1112
        url += '%C3%AF'
 
1113
        self.assertEquals(url, location_to_url(location))
 
1114
 
 
1115
    def test_path(self):
 
1116
        path, url = self.get_base_location()
 
1117
        self.assertEquals(url, location_to_url(path))
 
1118
 
 
1119
    def test_relative_file_url(self):
 
1120
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
 
1121
            location_to_url("file:bar"))
 
1122
 
 
1123
    def test_absolute_file_url(self):
 
1124
        self.assertEquals("file:///bar", location_to_url("file:/bar"))