~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2011-07-01 11:45:59 UTC
  • mfrom: (5971.1.87 bzr-gpgme)
  • Revision ID: pqm@pqm.ubuntu.com-20110701114559-gshz8uv6hr0x0pax
(jr) Various small improvements to GPG support
 - improve LoopbackGPGStrategy doc string - use unicode strings for UI
 - add doc string to verify_signatures_available()
 - add process_events_callback argument to do_verifications() to keep GUIs
 responsive (Jonathan Riddell)

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
 
18
18
from cStringIO import StringIO
19
 
import os
20
19
import subprocess
21
20
import sys
22
21
import threading
28
27
    transport,
29
28
    urlutils,
30
29
    )
31
 
from bzrlib.directory_service import directories
32
30
from bzrlib.transport import (
33
31
    chroot,
34
32
    fakenfs,
35
33
    http,
36
34
    local,
37
 
    location_to_url,
38
35
    memory,
39
36
    pathfilter,
40
37
    readonly,
41
38
    )
42
 
import bzrlib.transport.trace
43
39
from bzrlib.tests import (
44
40
    features,
45
41
    test_server,
52
48
class TestTransport(tests.TestCase):
53
49
    """Test the non transport-concrete class functionality."""
54
50
 
 
51
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
 
52
    # of try/finally -- vila 20100205
 
53
 
55
54
    def test__get_set_protocol_handlers(self):
56
55
        handlers = transport._get_protocol_handlers()
57
 
        self.assertNotEqual([], handlers.keys())
58
 
        transport._clear_protocol_handlers()
59
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
60
 
        self.assertEqual([], transport._get_protocol_handlers().keys())
 
56
        self.assertNotEqual([], handlers.keys( ))
 
57
        try:
 
58
            transport._clear_protocol_handlers()
 
59
            self.assertEqual([], transport._get_protocol_handlers().keys())
 
60
        finally:
 
61
            transport._set_protocol_handlers(handlers)
61
62
 
62
63
    def test_get_transport_modules(self):
63
64
        handlers = transport._get_protocol_handlers()
64
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
65
65
        # don't pollute the current handlers
66
66
        transport._clear_protocol_handlers()
67
 
 
68
67
        class SampleHandler(object):
69
68
            """I exist, isnt that enough?"""
70
 
        transport._clear_protocol_handlers()
71
 
        transport.register_transport_proto('foo')
72
 
        transport.register_lazy_transport('foo',
73
 
                                            'bzrlib.tests.test_transport',
74
 
                                            'TestTransport.SampleHandler')
75
 
        transport.register_transport_proto('bar')
76
 
        transport.register_lazy_transport('bar',
77
 
                                            'bzrlib.tests.test_transport',
78
 
                                            'TestTransport.SampleHandler')
79
 
        self.assertEqual([SampleHandler.__module__,
80
 
                            'bzrlib.transport.chroot',
81
 
                            'bzrlib.transport.pathfilter'],
82
 
                            transport._get_transport_modules())
 
69
        try:
 
70
            transport._clear_protocol_handlers()
 
71
            transport.register_transport_proto('foo')
 
72
            transport.register_lazy_transport('foo',
 
73
                                              'bzrlib.tests.test_transport',
 
74
                                              'TestTransport.SampleHandler')
 
75
            transport.register_transport_proto('bar')
 
76
            transport.register_lazy_transport('bar',
 
77
                                              'bzrlib.tests.test_transport',
 
78
                                              'TestTransport.SampleHandler')
 
79
            self.assertEqual([SampleHandler.__module__,
 
80
                              'bzrlib.transport.chroot',
 
81
                              'bzrlib.transport.pathfilter'],
 
82
                             transport._get_transport_modules())
 
83
        finally:
 
84
            transport._set_protocol_handlers(handlers)
83
85
 
84
86
    def test_transport_dependency(self):
85
87
        """Transport with missing dependency causes no error"""
86
88
        saved_handlers = transport._get_protocol_handlers()
87
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
88
89
        # don't pollute the current handlers
89
90
        transport._clear_protocol_handlers()
90
 
        transport.register_transport_proto('foo')
91
 
        transport.register_lazy_transport(
92
 
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
93
91
        try:
94
 
            transport.get_transport_from_url('foo://fooserver/foo')
95
 
        except errors.UnsupportedProtocol, e:
96
 
            e_str = str(e)
97
 
            self.assertEquals('Unsupported protocol'
98
 
                                ' for url "foo://fooserver/foo":'
99
 
                                ' Unable to import library "some_lib":'
100
 
                                ' testing missing dependency', str(e))
101
 
        else:
102
 
            self.fail('Did not raise UnsupportedProtocol')
 
92
            transport.register_transport_proto('foo')
 
93
            transport.register_lazy_transport(
 
94
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
95
            try:
 
96
                transport.get_transport('foo://fooserver/foo')
 
97
            except errors.UnsupportedProtocol, e:
 
98
                e_str = str(e)
 
99
                self.assertEquals('Unsupported protocol'
 
100
                                  ' for url "foo://fooserver/foo":'
 
101
                                  ' Unable to import library "some_lib":'
 
102
                                  ' testing missing dependency', str(e))
 
103
            else:
 
104
                self.fail('Did not raise UnsupportedProtocol')
 
105
        finally:
 
106
            # restore original values
 
107
            transport._set_protocol_handlers(saved_handlers)
103
108
 
104
109
    def test_transport_fallback(self):
105
110
        """Transport with missing dependency causes no error"""
106
111
        saved_handlers = transport._get_protocol_handlers()
107
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
108
 
        transport._clear_protocol_handlers()
109
 
        transport.register_transport_proto('foo')
110
 
        transport.register_lazy_transport(
111
 
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
112
 
        transport.register_lazy_transport(
113
 
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
114
 
        t = transport.get_transport_from_url('foo://fooserver/foo')
115
 
        self.assertTrue(isinstance(t, BackupTransportHandler))
 
112
        try:
 
113
            transport._clear_protocol_handlers()
 
114
            transport.register_transport_proto('foo')
 
115
            transport.register_lazy_transport(
 
116
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
117
            transport.register_lazy_transport(
 
118
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
119
            t = transport.get_transport('foo://fooserver/foo')
 
120
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
121
        finally:
 
122
            transport._set_protocol_handlers(saved_handlers)
116
123
 
117
124
    def test_ssh_hints(self):
118
125
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
119
126
        try:
120
 
            transport.get_transport_from_url('ssh://fooserver/foo')
 
127
            transport.get_transport('ssh://fooserver/foo')
121
128
        except errors.UnsupportedProtocol, e:
122
129
            e_str = str(e)
123
130
            self.assertEquals('Unsupported protocol'
211
218
 
212
219
    def test_coalesce_fudge(self):
213
220
        self.check([(10, 30, [(0, 10), (20, 10)]),
214
 
                    (100, 10, [(0, 10)]),
 
221
                    (100, 10, [(0, 10),]),
215
222
                   ], [(10, 10), (30, 10), (100, 10)],
216
 
                   fudge=10)
217
 
 
 
223
                   fudge=10
 
224
                  )
218
225
    def test_coalesce_max_size(self):
219
226
        self.check([(10, 20, [(0, 10), (10, 10)]),
220
227
                    (30, 50, [(0, 50)]),
221
228
                    # If one range is above max_size, it gets its own coalesced
222
229
                    # offset
223
 
                    (100, 80, [(0, 80)]),],
 
230
                    (100, 80, [(0, 80),]),],
224
231
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
225
 
                   max_size=50)
 
232
                   max_size=50
 
233
                  )
226
234
 
227
235
    def test_coalesce_no_max_size(self):
228
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
 
236
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
229
237
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
230
238
                  )
231
239
 
232
240
    def test_coalesce_default_limit(self):
233
241
        # By default we use a 100MB max size.
234
 
        ten_mb = 10 * 1024 * 1024
235
 
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
242
        ten_mb = 10*1024*1024
 
243
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
236
244
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
237
245
                   [(i*ten_mb, ten_mb) for i in range(11)])
238
 
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
239
 
                   [(i * ten_mb, ten_mb) for i in range(11)],
 
246
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
247
                   [(i*ten_mb, ten_mb) for i in range(11)],
240
248
                   max_size=1*1024*1024*1024)
241
249
 
242
250
 
247
255
        server.start_server()
248
256
        url = server.get_url()
249
257
        self.assertTrue(url in transport.transport_list_registry)
250
 
        t = transport.get_transport_from_url(url)
 
258
        t = transport.get_transport(url)
251
259
        del t
252
260
        server.stop_server()
253
261
        self.assertFalse(url in transport.transport_list_registry)
368
376
    def test_abspath(self):
369
377
        # The abspath is always relative to the chroot_url.
370
378
        server = chroot.ChrootServer(
371
 
            transport.get_transport_from_url('memory:///foo/bar/'))
 
379
            transport.get_transport('memory:///foo/bar/'))
372
380
        self.start_server(server)
373
 
        t = transport.get_transport_from_url(server.get_url())
 
381
        t = transport.get_transport(server.get_url())
374
382
        self.assertEqual(server.get_url(), t.abspath('/'))
375
383
 
376
384
        subdir_t = t.clone('subdir')
378
386
 
379
387
    def test_clone(self):
380
388
        server = chroot.ChrootServer(
381
 
            transport.get_transport_from_url('memory:///foo/bar/'))
 
389
            transport.get_transport('memory:///foo/bar/'))
382
390
        self.start_server(server)
383
 
        t = transport.get_transport_from_url(server.get_url())
 
391
        t = transport.get_transport(server.get_url())
384
392
        # relpath from root and root path are the same
385
393
        relpath_cloned = t.clone('foo')
386
394
        abspath_cloned = t.clone('/foo')
395
403
        This is so that it is not possible to escape a chroot by doing::
396
404
            url = chroot_transport.base
397
405
            parent_url = urlutils.join(url, '..')
398
 
            new_t = transport.get_transport_from_url(parent_url)
 
406
            new_t = transport.get_transport(parent_url)
399
407
        """
400
408
        server = chroot.ChrootServer(
401
 
            transport.get_transport_from_url('memory:///path/subpath'))
 
409
            transport.get_transport('memory:///path/subpath'))
402
410
        self.start_server(server)
403
 
        t = transport.get_transport_from_url(server.get_url())
404
 
        new_t = transport.get_transport_from_url(t.base)
 
411
        t = transport.get_transport(server.get_url())
 
412
        new_t = transport.get_transport(t.base)
405
413
        self.assertEqual(t.server, new_t.server)
406
414
        self.assertEqual(t.base, new_t.base)
407
415
 
412
420
        This is so that it is not possible to escape a chroot by doing::
413
421
            url = chroot_transport.base
414
422
            parent_url = urlutils.join(url, '..')
415
 
            new_t = transport.get_transport_from_url(parent_url)
 
423
            new_t = transport.get_transport(parent_url)
416
424
        """
417
 
        server = chroot.ChrootServer(
418
 
            transport.get_transport_from_url('memory:///path/'))
 
425
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
419
426
        self.start_server(server)
420
 
        t = transport.get_transport_from_url(server.get_url())
 
427
        t = transport.get_transport(server.get_url())
421
428
        self.assertRaises(
422
429
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
423
430
 
433
440
        backing_transport = memory.MemoryTransport()
434
441
        server = chroot.ChrootServer(backing_transport)
435
442
        server.start_server()
436
 
        self.addCleanup(server.stop_server)
437
 
        self.assertTrue(server.scheme
438
 
                        in transport._get_protocol_handlers().keys())
 
443
        try:
 
444
            self.assertTrue(server.scheme
 
445
                            in transport._get_protocol_handlers().keys())
 
446
        finally:
 
447
            server.stop_server()
439
448
 
440
449
    def test_stop_server(self):
441
450
        backing_transport = memory.MemoryTransport()
449
458
        backing_transport = memory.MemoryTransport()
450
459
        server = chroot.ChrootServer(backing_transport)
451
460
        server.start_server()
452
 
        self.addCleanup(server.stop_server)
453
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
461
        try:
 
462
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
463
        finally:
 
464
            server.stop_server()
454
465
 
455
466
 
456
467
class PathFilteringDecoratorTransportTest(tests.TestCase):
459
470
    def test_abspath(self):
460
471
        # The abspath is always relative to the base of the backing transport.
461
472
        server = pathfilter.PathFilteringServer(
462
 
            transport.get_transport_from_url('memory:///foo/bar/'),
 
473
            transport.get_transport('memory:///foo/bar/'),
463
474
            lambda x: x)
464
475
        server.start_server()
465
 
        t = transport.get_transport_from_url(server.get_url())
 
476
        t = transport.get_transport(server.get_url())
466
477
        self.assertEqual(server.get_url(), t.abspath('/'))
467
478
 
468
479
        subdir_t = t.clone('subdir')
471
482
 
472
483
    def make_pf_transport(self, filter_func=None):
473
484
        """Make a PathFilteringTransport backed by a MemoryTransport.
474
 
 
 
485
        
475
486
        :param filter_func: by default this will be a no-op function.  Use this
476
487
            parameter to override it."""
477
488
        if filter_func is None:
478
489
            filter_func = lambda x: x
479
490
        server = pathfilter.PathFilteringServer(
480
 
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
 
491
            transport.get_transport('memory:///foo/bar/'), filter_func)
481
492
        server.start_server()
482
493
        self.addCleanup(server.stop_server)
483
 
        return transport.get_transport_from_url(server.get_url())
 
494
        return transport.get_transport(server.get_url())
484
495
 
485
496
    def test__filter(self):
486
497
        # _filter (with an identity func as filter_func) always returns
499
510
 
500
511
    def test_filter_invocation(self):
501
512
        filter_log = []
502
 
 
503
513
        def filter(path):
504
514
            filter_log.append(path)
505
515
            return path
530
540
        otherwise) the filtering by doing::
531
541
            url = filtered_transport.base
532
542
            parent_url = urlutils.join(url, '..')
533
 
            new_t = transport.get_transport_from_url(parent_url)
 
543
            new_t = transport.get_transport(parent_url)
534
544
        """
535
545
        t = self.make_pf_transport()
536
 
        new_t = transport.get_transport_from_url(t.base)
 
546
        new_t = transport.get_transport(t.base)
537
547
        self.assertEqual(t.server, new_t.server)
538
548
        self.assertEqual(t.base, new_t.base)
539
549
 
552
562
        # connect to '.' via http which is not listable
553
563
        server = HttpServer()
554
564
        self.start_server(server)
555
 
        t = transport.get_transport_from_url('readonly+' + server.get_url())
 
565
        t = transport.get_transport('readonly+' + server.get_url())
556
566
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
557
567
        self.assertEqual(False, t.listable())
558
568
        self.assertEqual(True, t.is_readonly())
591
601
        # the url should be decorated appropriately
592
602
        self.assertStartsWith(server.get_url(), 'fakenfs+')
593
603
        # and we should be able to get a transport for it
594
 
        t = transport.get_transport_from_url(server.get_url())
 
604
        t = transport.get_transport(server.get_url())
595
605
        # which must be a FakeNFSTransportDecorator instance.
596
606
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
597
607
 
674
684
        base_url = self._server.get_url()
675
685
        url = self._adjust_url(base_url, relpath)
676
686
        # try getting the transport via the regular interface:
677
 
        t = transport.get_transport_from_url(url)
 
687
        t = transport.get_transport(url)
678
688
        # vila--20070607 if the following are commented out the test suite
679
689
        # still pass. Is this really still needed or was it a forgotten
680
690
        # temporary fix ?
685
695
        return t
686
696
 
687
697
 
688
 
class TestTransportFromPath(tests.TestCaseInTempDir):
689
 
 
690
 
    def test_with_path(self):
691
 
        t = transport.get_transport_from_path(self.test_dir)
692
 
        self.assertIsInstance(t, local.LocalTransport)
693
 
        self.assertEquals(t.base.rstrip("/"),
694
 
            urlutils.local_path_to_url(self.test_dir))
695
 
 
696
 
    def test_with_url(self):
697
 
        t = transport.get_transport_from_path("file:")
698
 
        self.assertIsInstance(t, local.LocalTransport)
699
 
        self.assertEquals(t.base.rstrip("/"),
700
 
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
701
 
 
702
 
 
703
 
class TestTransportFromUrl(tests.TestCaseInTempDir):
704
 
 
705
 
    def test_with_path(self):
706
 
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
707
 
            self.test_dir)
708
 
 
709
 
    def test_with_url(self):
710
 
        url = urlutils.local_path_to_url(self.test_dir)
711
 
        t = transport.get_transport_from_url(url)
712
 
        self.assertIsInstance(t, local.LocalTransport)
713
 
        self.assertEquals(t.base.rstrip("/"), url)
714
 
 
715
 
 
716
698
class TestLocalTransports(tests.TestCase):
717
699
 
718
700
    def test_get_transport_from_abspath(self):
740
722
        self.assertEquals(t.local_abspath(''), here)
741
723
 
742
724
 
743
 
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
744
 
 
745
 
    def test_local_fdatasync_calls_fdatasync(self):
746
 
        """Check fdatasync on a stream tries to flush the data to the OS.
747
 
        
748
 
        We can't easily observe the external effect but we can at least see
749
 
        it's called.
750
 
        """
751
 
        sentinel = object()
752
 
        fdatasync = getattr(os, 'fdatasync', sentinel)
753
 
        if fdatasync is sentinel:
754
 
            raise tests.TestNotApplicable('fdatasync not supported')
755
 
        t = self.get_transport('.')
756
 
        calls = self.recordCalls(os, 'fdatasync')
757
 
        w = t.open_write_stream('out')
758
 
        w.write('foo')
759
 
        w.fdatasync()
760
 
        with open('out', 'rb') as f:
761
 
            # Should have been flushed.
762
 
            self.assertEquals(f.read(), 'foo')
763
 
        self.assertEquals(len(calls), 1, calls)
764
 
 
765
 
 
766
725
class TestWin32LocalTransport(tests.TestCase):
767
726
 
768
727
    def test_unc_clone_to_root(self):
817
776
    def test_relpath(self):
818
777
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
819
778
 
820
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
821
 
            'sub')
 
779
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
822
780
        self.assertRaises(errors.PathNotChild, t.relpath,
823
781
                          'http://user@host.com/abs/path/sub')
824
782
        self.assertRaises(errors.PathNotChild, t.relpath,
868
826
 
869
827
    def test_reuse_same_transport(self):
870
828
        possible_transports = []
871
 
        t1 = transport.get_transport_from_url('http://foo/',
 
829
        t1 = transport.get_transport('http://foo/',
872
830
                                     possible_transports=possible_transports)
873
831
        self.assertEqual([t1], possible_transports)
874
 
        t2 = transport.get_transport_from_url('http://foo/',
 
832
        t2 = transport.get_transport('http://foo/',
875
833
                                     possible_transports=[t1])
876
834
        self.assertIs(t1, t2)
877
835
 
878
836
        # Also check that final '/' are handled correctly
879
 
        t3 = transport.get_transport_from_url('http://foo/path/')
880
 
        t4 = transport.get_transport_from_url('http://foo/path',
 
837
        t3 = transport.get_transport('http://foo/path/')
 
838
        t4 = transport.get_transport('http://foo/path',
881
839
                                     possible_transports=[t3])
882
840
        self.assertIs(t3, t4)
883
841
 
884
 
        t5 = transport.get_transport_from_url('http://foo/path')
885
 
        t6 = transport.get_transport_from_url('http://foo/path/',
 
842
        t5 = transport.get_transport('http://foo/path')
 
843
        t6 = transport.get_transport('http://foo/path/',
886
844
                                     possible_transports=[t5])
887
845
        self.assertIs(t5, t6)
888
846
 
889
847
    def test_don_t_reuse_different_transport(self):
890
 
        t1 = transport.get_transport_from_url('http://foo/path')
891
 
        t2 = transport.get_transport_from_url('http://bar/path',
 
848
        t1 = transport.get_transport('http://foo/path')
 
849
        t2 = transport.get_transport('http://bar/path',
892
850
                                     possible_transports=[t1])
893
851
        self.assertIsNot(t1, t2)
894
852
 
895
853
 
896
854
class TestTransportTrace(tests.TestCase):
897
855
 
898
 
    def test_decorator(self):
899
 
        t = transport.get_transport_from_url('trace+memory://')
900
 
        self.assertIsInstance(
901
 
            t, bzrlib.transport.trace.TransportTraceDecorator)
 
856
    def test_get(self):
 
857
        t = transport.get_transport('trace+memory://')
 
858
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
902
859
 
903
860
    def test_clone_preserves_activity(self):
904
 
        t = transport.get_transport_from_url('trace+memory://')
 
861
        t = transport.get_transport('trace+memory://')
905
862
        t2 = t.clone('.')
906
863
        self.assertTrue(t is not t2)
907
864
        self.assertTrue(t._activity is t2._activity)
911
868
    # still won't cause a test failure when the top level Transport API
912
869
    # changes; so there is little return doing that.
913
870
    def test_get(self):
914
 
        t = transport.get_transport_from_url('trace+memory:///')
 
871
        t = transport.get_transport('trace+memory:///')
915
872
        t.put_bytes('foo', 'barish')
916
873
        t.get('foo')
917
874
        expected_result = []
923
880
        self.assertEqual(expected_result, t._activity)
924
881
 
925
882
    def test_readv(self):
926
 
        t = transport.get_transport_from_url('trace+memory:///')
 
883
        t = transport.get_transport('trace+memory:///')
927
884
        t.put_bytes('foo', 'barish')
928
885
        list(t.readv('foo', [(0, 1), (3, 2)],
929
886
                     adjust_for_latency=True, upper_limit=6))
939
896
class TestSSHConnections(tests.TestCaseWithTransport):
940
897
 
941
898
    def test_bzr_connect_to_bzr_ssh(self):
942
 
        """get_transport of a bzr+ssh:// behaves correctly.
 
899
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
943
900
 
944
901
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
945
902
        """
961
918
        # SSH channel ourselves.  Surely this has already been implemented
962
919
        # elsewhere?
963
920
        started = []
964
 
 
965
921
        class StubSSHServer(stub_sftp.StubServer):
966
922
 
967
923
            test = self
973
929
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
974
930
 
975
931
                # XXX: horribly inefficient, not to mention ugly.
976
 
                # Start a thread for each of stdin/out/err, and relay bytes
977
 
                # from the subprocess to channel and vice versa.
 
932
                # Start a thread for each of stdin/out/err, and relay bytes from
 
933
                # the subprocess to channel and vice versa.
978
934
                def ferry_bytes(read, write, close):
979
935
                    while True:
980
936
                        bytes = read(1)
1048
1004
        result = http.unhtml_roughly(fake_html)
1049
1005
        self.assertEquals(len(result), 1000)
1050
1006
        self.assertStartsWith(result, " something!")
1051
 
 
1052
 
 
1053
 
class SomeDirectory(object):
1054
 
 
1055
 
    def look_up(self, name, url):
1056
 
        return "http://bar"
1057
 
 
1058
 
 
1059
 
class TestLocationToUrl(tests.TestCase):
1060
 
 
1061
 
    def test_regular_url(self):
1062
 
        self.assertEquals("file://foo", location_to_url("file://foo"))
1063
 
 
1064
 
    def test_directory(self):
1065
 
        directories.register("bar:", SomeDirectory, "Dummy directory")
1066
 
        self.addCleanup(directories.remove, "bar:")
1067
 
        self.assertEquals("http://bar", location_to_url("bar:"))
1068
 
 
1069
 
    def test_unicode_url(self):
1070
 
        self.assertRaises(errors.InvalidURL, location_to_url,
1071
 
            "http://fo/\xc3\xaf".decode("utf-8"))
1072
 
 
1073
 
    def test_unicode_path(self):
1074
 
        self.assertEquals("file:///foo/bar%C3%AF",
1075
 
            location_to_url("/foo/bar\xc3\xaf".decode("utf-8")))
1076
 
 
1077
 
    def test_path(self):
1078
 
        self.assertEquals("file:///foo/bar", location_to_url("/foo/bar"))
1079
 
 
1080
 
    def test_relative_file_url(self):
1081
 
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
1082
 
            location_to_url("file:bar"))
1083
 
 
1084
 
    def test_absolute_file_url(self):
1085
 
        self.assertEquals("file:///bar", location_to_url("file:/bar"))