~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2016-04-03 16:32:31 UTC
  • mto: This revision was merged to the branch mainline in revision 6617.
  • Revision ID: jelmer@jelmer.uk-20160403163231-h72bo0uyek2gikw0
Don't put French text in doc/en/user-reference when LANGUAGE=fr_CH.UTF_8.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
29
29
    transport,
30
30
    urlutils,
31
31
    )
 
32
from bzrlib.directory_service import directories
32
33
from bzrlib.transport import (
33
34
    chroot,
34
35
    fakenfs,
35
36
    http,
36
37
    local,
 
38
    location_to_url,
37
39
    memory,
38
40
    pathfilter,
39
41
    readonly,
40
42
    )
 
43
import bzrlib.transport.trace
41
44
from bzrlib.tests import (
42
45
    features,
43
46
    test_server,
50
53
class TestTransport(tests.TestCase):
51
54
    """Test the non transport-concrete class functionality."""
52
55
 
53
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
54
 
    # of try/finally -- vila 20100205
55
 
 
56
56
    def test__get_set_protocol_handlers(self):
57
57
        handlers = transport._get_protocol_handlers()
58
 
        self.assertNotEqual([], handlers.keys( ))
59
 
        try:
60
 
            transport._clear_protocol_handlers()
61
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
62
 
        finally:
63
 
            transport._set_protocol_handlers(handlers)
 
58
        self.assertNotEqual([], handlers.keys())
 
59
        transport._clear_protocol_handlers()
 
60
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
61
        self.assertEqual([], transport._get_protocol_handlers().keys())
64
62
 
65
63
    def test_get_transport_modules(self):
66
64
        handlers = transport._get_protocol_handlers()
 
65
        self.addCleanup(transport._set_protocol_handlers, handlers)
67
66
        # don't pollute the current handlers
68
67
        transport._clear_protocol_handlers()
 
68
 
69
69
        class SampleHandler(object):
70
70
            """I exist, isnt that enough?"""
71
 
        try:
72
 
            transport._clear_protocol_handlers()
73
 
            transport.register_transport_proto('foo')
74
 
            transport.register_lazy_transport('foo',
75
 
                                              'bzrlib.tests.test_transport',
76
 
                                              'TestTransport.SampleHandler')
77
 
            transport.register_transport_proto('bar')
78
 
            transport.register_lazy_transport('bar',
79
 
                                              'bzrlib.tests.test_transport',
80
 
                                              'TestTransport.SampleHandler')
81
 
            self.assertEqual([SampleHandler.__module__,
82
 
                              'bzrlib.transport.chroot',
83
 
                              'bzrlib.transport.pathfilter'],
84
 
                             transport._get_transport_modules())
85
 
        finally:
86
 
            transport._set_protocol_handlers(handlers)
 
71
        transport._clear_protocol_handlers()
 
72
        transport.register_transport_proto('foo')
 
73
        transport.register_lazy_transport('foo',
 
74
                                            'bzrlib.tests.test_transport',
 
75
                                            'TestTransport.SampleHandler')
 
76
        transport.register_transport_proto('bar')
 
77
        transport.register_lazy_transport('bar',
 
78
                                            'bzrlib.tests.test_transport',
 
79
                                            'TestTransport.SampleHandler')
 
80
        self.assertEqual([SampleHandler.__module__,
 
81
                            'bzrlib.transport.chroot',
 
82
                            'bzrlib.transport.pathfilter'],
 
83
                            transport._get_transport_modules())
87
84
 
88
85
    def test_transport_dependency(self):
89
86
        """Transport with missing dependency causes no error"""
90
87
        saved_handlers = transport._get_protocol_handlers()
 
88
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
91
89
        # don't pollute the current handlers
92
90
        transport._clear_protocol_handlers()
 
91
        transport.register_transport_proto('foo')
 
92
        transport.register_lazy_transport(
 
93
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
93
94
        try:
94
 
            transport.register_transport_proto('foo')
95
 
            transport.register_lazy_transport(
96
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
97
 
            try:
98
 
                transport.get_transport('foo://fooserver/foo')
99
 
            except errors.UnsupportedProtocol, e:
100
 
                e_str = str(e)
101
 
                self.assertEquals('Unsupported protocol'
102
 
                                  ' for url "foo://fooserver/foo":'
103
 
                                  ' Unable to import library "some_lib":'
104
 
                                  ' testing missing dependency', str(e))
105
 
            else:
106
 
                self.fail('Did not raise UnsupportedProtocol')
107
 
        finally:
108
 
            # restore original values
109
 
            transport._set_protocol_handlers(saved_handlers)
 
95
            transport.get_transport_from_url('foo://fooserver/foo')
 
96
        except errors.UnsupportedProtocol, e:
 
97
            e_str = str(e)
 
98
            self.assertEqual('Unsupported protocol'
 
99
                                ' for url "foo://fooserver/foo":'
 
100
                                ' Unable to import library "some_lib":'
 
101
                                ' testing missing dependency', str(e))
 
102
        else:
 
103
            self.fail('Did not raise UnsupportedProtocol')
110
104
 
111
105
    def test_transport_fallback(self):
112
106
        """Transport with missing dependency causes no error"""
113
107
        saved_handlers = transport._get_protocol_handlers()
114
 
        try:
115
 
            transport._clear_protocol_handlers()
116
 
            transport.register_transport_proto('foo')
117
 
            transport.register_lazy_transport(
118
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
119
 
            transport.register_lazy_transport(
120
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
121
 
            t = transport.get_transport('foo://fooserver/foo')
122
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
123
 
        finally:
124
 
            transport._set_protocol_handlers(saved_handlers)
 
108
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
109
        transport._clear_protocol_handlers()
 
110
        transport.register_transport_proto('foo')
 
111
        transport.register_lazy_transport(
 
112
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
113
        transport.register_lazy_transport(
 
114
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
115
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
116
        self.assertTrue(isinstance(t, BackupTransportHandler))
125
117
 
126
118
    def test_ssh_hints(self):
127
119
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
128
120
        try:
129
 
            transport.get_transport('ssh://fooserver/foo')
 
121
            transport.get_transport_from_url('ssh://fooserver/foo')
130
122
        except errors.UnsupportedProtocol, e:
131
123
            e_str = str(e)
132
 
            self.assertEquals('Unsupported protocol'
 
124
            self.assertEqual('Unsupported protocol'
133
125
                              ' for url "ssh://fooserver/foo":'
134
126
                              ' bzr supports bzr+ssh to operate over ssh,'
135
127
                              ' use "bzr+ssh://fooserver/foo".',
147
139
        self.assertRaises(errors.ReadError, a_file.read, 40)
148
140
        a_file.close()
149
141
 
150
 
    def test__combine_paths(self):
151
 
        t = transport.Transport('/')
152
 
        self.assertEqual('/home/sarah/project/foo',
153
 
                         t._combine_paths('/home/sarah', 'project/foo'))
154
 
        self.assertEqual('/etc',
155
 
                         t._combine_paths('/home/sarah', '../../etc'))
156
 
        self.assertEqual('/etc',
157
 
                         t._combine_paths('/home/sarah', '../../../etc'))
158
 
        self.assertEqual('/etc',
159
 
                         t._combine_paths('/home/sarah', '/etc'))
160
 
 
161
142
    def test_local_abspath_non_local_transport(self):
162
143
        # the base implementation should throw
163
144
        t = memory.MemoryTransport()
220
201
 
221
202
    def test_coalesce_fudge(self):
222
203
        self.check([(10, 30, [(0, 10), (20, 10)]),
223
 
                    (100, 10, [(0, 10),]),
 
204
                    (100, 10, [(0, 10)]),
224
205
                   ], [(10, 10), (30, 10), (100, 10)],
225
 
                   fudge=10
226
 
                  )
 
206
                   fudge=10)
 
207
 
227
208
    def test_coalesce_max_size(self):
228
209
        self.check([(10, 20, [(0, 10), (10, 10)]),
229
210
                    (30, 50, [(0, 50)]),
230
211
                    # If one range is above max_size, it gets its own coalesced
231
212
                    # offset
232
 
                    (100, 80, [(0, 80),]),],
 
213
                    (100, 80, [(0, 80)]),],
233
214
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
234
 
                   max_size=50
235
 
                  )
 
215
                   max_size=50)
236
216
 
237
217
    def test_coalesce_no_max_size(self):
238
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
218
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
239
219
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
240
220
                  )
241
221
 
242
222
    def test_coalesce_default_limit(self):
243
223
        # By default we use a 100MB max size.
244
 
        ten_mb = 10*1024*1024
245
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
224
        ten_mb = 10 * 1024 * 1024
 
225
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
246
226
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
247
227
                   [(i*ten_mb, ten_mb) for i in range(11)])
248
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
249
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
228
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
229
                   [(i * ten_mb, ten_mb) for i in range(11)],
250
230
                   max_size=1*1024*1024*1024)
251
231
 
252
232
 
257
237
        server.start_server()
258
238
        url = server.get_url()
259
239
        self.assertTrue(url in transport.transport_list_registry)
260
 
        t = transport.get_transport(url)
 
240
        t = transport.get_transport_from_url(url)
261
241
        del t
262
242
        server.stop_server()
263
243
        self.assertFalse(url in transport.transport_list_registry)
318
298
 
319
299
    def test_has_missing(self):
320
300
        t = memory.MemoryTransport()
321
 
        self.assertEquals(False, t.has('foo'))
 
301
        self.assertEqual(False, t.has('foo'))
322
302
 
323
303
    def test_has_present(self):
324
304
        t = memory.MemoryTransport()
325
305
        t.append_bytes('foo', 'content')
326
 
        self.assertEquals(True, t.has('foo'))
 
306
        self.assertEqual(True, t.has('foo'))
327
307
 
328
308
    def test_list_dir(self):
329
309
        t = memory.MemoryTransport()
332
312
        t.put_bytes('dir/subfoo', 'content')
333
313
        t.put_bytes('dirlike', 'content')
334
314
 
335
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
336
 
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
 
315
        self.assertEqual(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
316
        self.assertEqual(['subfoo'], sorted(t.list_dir('dir')))
337
317
 
338
318
    def test_mkdir(self):
339
319
        t = memory.MemoryTransport()
378
358
    def test_abspath(self):
379
359
        # The abspath is always relative to the chroot_url.
380
360
        server = chroot.ChrootServer(
381
 
            transport.get_transport('memory:///foo/bar/'))
 
361
            transport.get_transport_from_url('memory:///foo/bar/'))
382
362
        self.start_server(server)
383
 
        t = transport.get_transport(server.get_url())
 
363
        t = transport.get_transport_from_url(server.get_url())
384
364
        self.assertEqual(server.get_url(), t.abspath('/'))
385
365
 
386
366
        subdir_t = t.clone('subdir')
388
368
 
389
369
    def test_clone(self):
390
370
        server = chroot.ChrootServer(
391
 
            transport.get_transport('memory:///foo/bar/'))
 
371
            transport.get_transport_from_url('memory:///foo/bar/'))
392
372
        self.start_server(server)
393
 
        t = transport.get_transport(server.get_url())
 
373
        t = transport.get_transport_from_url(server.get_url())
394
374
        # relpath from root and root path are the same
395
375
        relpath_cloned = t.clone('foo')
396
376
        abspath_cloned = t.clone('/foo')
405
385
        This is so that it is not possible to escape a chroot by doing::
406
386
            url = chroot_transport.base
407
387
            parent_url = urlutils.join(url, '..')
408
 
            new_t = transport.get_transport(parent_url)
 
388
            new_t = transport.get_transport_from_url(parent_url)
409
389
        """
410
390
        server = chroot.ChrootServer(
411
 
            transport.get_transport('memory:///path/subpath'))
 
391
            transport.get_transport_from_url('memory:///path/subpath'))
412
392
        self.start_server(server)
413
 
        t = transport.get_transport(server.get_url())
414
 
        new_t = transport.get_transport(t.base)
 
393
        t = transport.get_transport_from_url(server.get_url())
 
394
        new_t = transport.get_transport_from_url(t.base)
415
395
        self.assertEqual(t.server, new_t.server)
416
396
        self.assertEqual(t.base, new_t.base)
417
397
 
422
402
        This is so that it is not possible to escape a chroot by doing::
423
403
            url = chroot_transport.base
424
404
            parent_url = urlutils.join(url, '..')
425
 
            new_t = transport.get_transport(parent_url)
 
405
            new_t = transport.get_transport_from_url(parent_url)
426
406
        """
427
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
407
        server = chroot.ChrootServer(
 
408
            transport.get_transport_from_url('memory:///path/'))
428
409
        self.start_server(server)
429
 
        t = transport.get_transport(server.get_url())
 
410
        t = transport.get_transport_from_url(server.get_url())
430
411
        self.assertRaises(
431
412
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
432
413
 
442
423
        backing_transport = memory.MemoryTransport()
443
424
        server = chroot.ChrootServer(backing_transport)
444
425
        server.start_server()
445
 
        try:
446
 
            self.assertTrue(server.scheme
447
 
                            in transport._get_protocol_handlers().keys())
448
 
        finally:
449
 
            server.stop_server()
 
426
        self.addCleanup(server.stop_server)
 
427
        self.assertTrue(server.scheme
 
428
                        in transport._get_protocol_handlers().keys())
450
429
 
451
430
    def test_stop_server(self):
452
431
        backing_transport = memory.MemoryTransport()
460
439
        backing_transport = memory.MemoryTransport()
461
440
        server = chroot.ChrootServer(backing_transport)
462
441
        server.start_server()
463
 
        try:
464
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
465
 
        finally:
466
 
            server.stop_server()
 
442
        self.addCleanup(server.stop_server)
 
443
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
444
 
 
445
 
 
446
class TestHooks(tests.TestCase):
 
447
    """Basic tests for transport hooks"""
 
448
 
 
449
    def _get_connected_transport(self):
 
450
        return transport.ConnectedTransport("bogus:nowhere")
 
451
 
 
452
    def test_transporthooks_initialisation(self):
 
453
        """Check all expected transport hook points are set up"""
 
454
        hookpoint = transport.TransportHooks()
 
455
        self.assertTrue("post_connect" in hookpoint,
 
456
            "post_connect not in %s" % (hookpoint,))
 
457
 
 
458
    def test_post_connect(self):
 
459
        """Ensure the post_connect hook is called when _set_transport is"""
 
460
        calls = []
 
461
        transport.Transport.hooks.install_named_hook("post_connect",
 
462
            calls.append, None)
 
463
        t = self._get_connected_transport()
 
464
        self.assertLength(0, calls)
 
465
        t._set_connection("connection", "auth")
 
466
        self.assertEqual(calls, [t])
467
467
 
468
468
 
469
469
class PathFilteringDecoratorTransportTest(tests.TestCase):
472
472
    def test_abspath(self):
473
473
        # The abspath is always relative to the base of the backing transport.
474
474
        server = pathfilter.PathFilteringServer(
475
 
            transport.get_transport('memory:///foo/bar/'),
 
475
            transport.get_transport_from_url('memory:///foo/bar/'),
476
476
            lambda x: x)
477
477
        server.start_server()
478
 
        t = transport.get_transport(server.get_url())
 
478
        t = transport.get_transport_from_url(server.get_url())
479
479
        self.assertEqual(server.get_url(), t.abspath('/'))
480
480
 
481
481
        subdir_t = t.clone('subdir')
484
484
 
485
485
    def make_pf_transport(self, filter_func=None):
486
486
        """Make a PathFilteringTransport backed by a MemoryTransport.
487
 
        
 
487
 
488
488
        :param filter_func: by default this will be a no-op function.  Use this
489
489
            parameter to override it."""
490
490
        if filter_func is None:
491
491
            filter_func = lambda x: x
492
492
        server = pathfilter.PathFilteringServer(
493
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
493
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
494
494
        server.start_server()
495
495
        self.addCleanup(server.stop_server)
496
 
        return transport.get_transport(server.get_url())
 
496
        return transport.get_transport_from_url(server.get_url())
497
497
 
498
498
    def test__filter(self):
499
499
        # _filter (with an identity func as filter_func) always returns
512
512
 
513
513
    def test_filter_invocation(self):
514
514
        filter_log = []
 
515
 
515
516
        def filter(path):
516
517
            filter_log.append(path)
517
518
            return path
542
543
        otherwise) the filtering by doing::
543
544
            url = filtered_transport.base
544
545
            parent_url = urlutils.join(url, '..')
545
 
            new_t = transport.get_transport(parent_url)
 
546
            new_t = transport.get_transport_from_url(parent_url)
546
547
        """
547
548
        t = self.make_pf_transport()
548
 
        new_t = transport.get_transport(t.base)
 
549
        new_t = transport.get_transport_from_url(t.base)
549
550
        self.assertEqual(t.server, new_t.server)
550
551
        self.assertEqual(t.base, new_t.base)
551
552
 
564
565
        # connect to '.' via http which is not listable
565
566
        server = HttpServer()
566
567
        self.start_server(server)
567
 
        t = transport.get_transport('readonly+' + server.get_url())
 
568
        t = transport.get_transport_from_url('readonly+' + server.get_url())
568
569
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
569
570
        self.assertEqual(False, t.listable())
570
571
        self.assertEqual(True, t.is_readonly())
603
604
        # the url should be decorated appropriately
604
605
        self.assertStartsWith(server.get_url(), 'fakenfs+')
605
606
        # and we should be able to get a transport for it
606
 
        t = transport.get_transport(server.get_url())
 
607
        t = transport.get_transport_from_url(server.get_url())
607
608
        # which must be a FakeNFSTransportDecorator instance.
608
609
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
609
610
 
686
687
        base_url = self._server.get_url()
687
688
        url = self._adjust_url(base_url, relpath)
688
689
        # try getting the transport via the regular interface:
689
 
        t = transport.get_transport(url)
 
690
        t = transport.get_transport_from_url(url)
690
691
        # vila--20070607 if the following are commented out the test suite
691
692
        # still pass. Is this really still needed or was it a forgotten
692
693
        # temporary fix ?
697
698
        return t
698
699
 
699
700
 
 
701
class TestTransportFromPath(tests.TestCaseInTempDir):
 
702
 
 
703
    def test_with_path(self):
 
704
        t = transport.get_transport_from_path(self.test_dir)
 
705
        self.assertIsInstance(t, local.LocalTransport)
 
706
        self.assertEqual(t.base.rstrip("/"),
 
707
            urlutils.local_path_to_url(self.test_dir))
 
708
 
 
709
    def test_with_url(self):
 
710
        t = transport.get_transport_from_path("file:")
 
711
        self.assertIsInstance(t, local.LocalTransport)
 
712
        self.assertEqual(t.base.rstrip("/"),
 
713
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
714
 
 
715
 
 
716
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
717
 
 
718
    def test_with_path(self):
 
719
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
720
            self.test_dir)
 
721
 
 
722
    def test_with_url(self):
 
723
        url = urlutils.local_path_to_url(self.test_dir)
 
724
        t = transport.get_transport_from_url(url)
 
725
        self.assertIsInstance(t, local.LocalTransport)
 
726
        self.assertEqual(t.base.rstrip("/"), url)
 
727
 
 
728
    def test_with_url_and_segment_parameters(self):
 
729
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
 
730
        t = transport.get_transport_from_url(url)
 
731
        self.assertIsInstance(t, local.LocalTransport)
 
732
        self.assertEqual(t.base.rstrip("/"), url)
 
733
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
 
734
            f.write("data")
 
735
        self.assertTrue(t.has("afile"))
 
736
 
 
737
 
700
738
class TestLocalTransports(tests.TestCase):
701
739
 
702
740
    def test_get_transport_from_abspath(self):
703
741
        here = osutils.abspath('.')
704
742
        t = transport.get_transport(here)
705
743
        self.assertIsInstance(t, local.LocalTransport)
706
 
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
744
        self.assertEqual(t.base, urlutils.local_path_to_url(here) + '/')
707
745
 
708
746
    def test_get_transport_from_relpath(self):
709
747
        here = osutils.abspath('.')
710
748
        t = transport.get_transport('.')
711
749
        self.assertIsInstance(t, local.LocalTransport)
712
 
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
750
        self.assertEqual(t.base, urlutils.local_path_to_url('.') + '/')
713
751
 
714
752
    def test_get_transport_from_local_url(self):
715
753
        here = osutils.abspath('.')
716
754
        here_url = urlutils.local_path_to_url(here) + '/'
717
755
        t = transport.get_transport(here_url)
718
756
        self.assertIsInstance(t, local.LocalTransport)
719
 
        self.assertEquals(t.base, here_url)
 
757
        self.assertEqual(t.base, here_url)
720
758
 
721
759
    def test_local_abspath(self):
722
760
        here = osutils.abspath('.')
723
761
        t = transport.get_transport(here)
724
 
        self.assertEquals(t.local_abspath(''), here)
 
762
        self.assertEqual(t.local_abspath(''), here)
725
763
 
726
764
 
727
765
class TestLocalTransportMutation(tests.TestCaseInTempDir):
755
793
        We can't easily observe the external effect but we can at least see
756
794
        it's called.
757
795
        """
 
796
        sentinel = object()
 
797
        fdatasync = getattr(os, 'fdatasync', sentinel)
 
798
        if fdatasync is sentinel:
 
799
            raise tests.TestNotApplicable('fdatasync not supported')
758
800
        t = self.get_transport('.')
759
801
        calls = self.recordCalls(os, 'fdatasync')
760
802
        w = t.open_write_stream('out')
762
804
        w.fdatasync()
763
805
        with open('out', 'rb') as f:
764
806
            # Should have been flushed.
765
 
            self.assertEquals(f.read(), 'foo')
766
 
        self.assertEquals(len(calls), 1, calls)
 
807
            self.assertEqual(f.read(), 'foo')
 
808
        self.assertEqual(len(calls), 1, calls)
 
809
 
 
810
    def test_missing_directory(self):
 
811
        t = self.get_transport('.')
 
812
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
767
813
 
768
814
 
769
815
class TestWin32LocalTransport(tests.TestCase):
770
816
 
771
817
    def test_unc_clone_to_root(self):
 
818
        self.requireFeature(features.win32_feature)
772
819
        # Win32 UNC path like \\HOST\path
773
820
        # clone to root should stop at least at \\HOST part
774
821
        # not on \\
775
822
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
776
823
        for i in xrange(4):
777
824
            t = t.clone('..')
778
 
        self.assertEquals(t.base, 'file://HOST/')
 
825
        self.assertEqual(t.base, 'file://HOST/')
779
826
        # make sure we reach the root
780
827
        t = t.clone('..')
781
 
        self.assertEquals(t.base, 'file://HOST/')
 
828
        self.assertEqual(t.base, 'file://HOST/')
782
829
 
783
830
 
784
831
class TestConnectedTransport(tests.TestCase):
787
834
    def test_parse_url(self):
788
835
        t = transport.ConnectedTransport(
789
836
            'http://simple.example.com/home/source')
790
 
        self.assertEquals(t._host, 'simple.example.com')
791
 
        self.assertEquals(t._port, None)
792
 
        self.assertEquals(t._path, '/home/source/')
793
 
        self.assertTrue(t._user is None)
794
 
        self.assertTrue(t._password is None)
 
837
        self.assertEqual(t._parsed_url.host, 'simple.example.com')
 
838
        self.assertEqual(t._parsed_url.port, None)
 
839
        self.assertEqual(t._parsed_url.path, '/home/source/')
 
840
        self.assertTrue(t._parsed_url.user is None)
 
841
        self.assertTrue(t._parsed_url.password is None)
795
842
 
796
 
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
843
        self.assertEqual(t.base, 'http://simple.example.com/home/source/')
797
844
 
798
845
    def test_parse_url_with_at_in_user(self):
799
846
        # Bug 228058
800
847
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
801
 
        self.assertEquals(t._user, 'user@host.com')
 
848
        self.assertEqual(t._parsed_url.user, 'user@host.com')
802
849
 
803
850
    def test_parse_quoted_url(self):
804
851
        t = transport.ConnectedTransport(
805
852
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
806
 
        self.assertEquals(t._host, 'exAmple.com')
807
 
        self.assertEquals(t._port, 2222)
808
 
        self.assertEquals(t._user, 'robey')
809
 
        self.assertEquals(t._password, 'h@t')
810
 
        self.assertEquals(t._path, '/path/')
 
853
        self.assertEqual(t._parsed_url.host, 'exAmple.com')
 
854
        self.assertEqual(t._parsed_url.port, 2222)
 
855
        self.assertEqual(t._parsed_url.user, 'robey')
 
856
        self.assertEqual(t._parsed_url.password, 'h@t')
 
857
        self.assertEqual(t._parsed_url.path, '/path/')
811
858
 
812
859
        # Base should not keep track of the password
813
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
860
        self.assertEqual(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
814
861
 
815
862
    def test_parse_invalid_url(self):
816
863
        self.assertRaises(errors.InvalidURL,
820
867
    def test_relpath(self):
821
868
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
822
869
 
823
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
870
        self.assertEqual(t.relpath('sftp://user@host.com/abs/path/sub'),
 
871
            'sub')
824
872
        self.assertRaises(errors.PathNotChild, t.relpath,
825
873
                          'http://user@host.com/abs/path/sub')
826
874
        self.assertRaises(errors.PathNotChild, t.relpath,
831
879
                          'sftp://user@host.com:33/abs/path/sub')
832
880
        # Make sure it works when we don't supply a username
833
881
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
834
 
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
882
        self.assertEqual(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
835
883
 
836
884
        # Make sure it works when parts of the path will be url encoded
837
885
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
838
 
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
886
        self.assertEqual(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
839
887
 
840
888
    def test_connection_sharing_propagate_credentials(self):
841
889
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
842
 
        self.assertEquals('user', t._user)
843
 
        self.assertEquals('host.com', t._host)
 
890
        self.assertEqual('user', t._parsed_url.user)
 
891
        self.assertEqual('host.com', t._parsed_url.host)
844
892
        self.assertIs(None, t._get_connection())
845
 
        self.assertIs(None, t._password)
 
893
        self.assertIs(None, t._parsed_url.password)
846
894
        c = t.clone('subdir')
847
895
        self.assertIs(None, c._get_connection())
848
 
        self.assertIs(None, t._password)
 
896
        self.assertIs(None, t._parsed_url.password)
849
897
 
850
898
        # Simulate the user entering a password
851
899
        password = 'secret'
870
918
 
871
919
    def test_reuse_same_transport(self):
872
920
        possible_transports = []
873
 
        t1 = transport.get_transport('http://foo/',
 
921
        t1 = transport.get_transport_from_url('http://foo/',
874
922
                                     possible_transports=possible_transports)
875
923
        self.assertEqual([t1], possible_transports)
876
 
        t2 = transport.get_transport('http://foo/',
 
924
        t2 = transport.get_transport_from_url('http://foo/',
877
925
                                     possible_transports=[t1])
878
926
        self.assertIs(t1, t2)
879
927
 
880
928
        # Also check that final '/' are handled correctly
881
 
        t3 = transport.get_transport('http://foo/path/')
882
 
        t4 = transport.get_transport('http://foo/path',
 
929
        t3 = transport.get_transport_from_url('http://foo/path/')
 
930
        t4 = transport.get_transport_from_url('http://foo/path',
883
931
                                     possible_transports=[t3])
884
932
        self.assertIs(t3, t4)
885
933
 
886
 
        t5 = transport.get_transport('http://foo/path')
887
 
        t6 = transport.get_transport('http://foo/path/',
 
934
        t5 = transport.get_transport_from_url('http://foo/path')
 
935
        t6 = transport.get_transport_from_url('http://foo/path/',
888
936
                                     possible_transports=[t5])
889
937
        self.assertIs(t5, t6)
890
938
 
891
939
    def test_don_t_reuse_different_transport(self):
892
 
        t1 = transport.get_transport('http://foo/path')
893
 
        t2 = transport.get_transport('http://bar/path',
 
940
        t1 = transport.get_transport_from_url('http://foo/path')
 
941
        t2 = transport.get_transport_from_url('http://bar/path',
894
942
                                     possible_transports=[t1])
895
943
        self.assertIsNot(t1, t2)
896
944
 
897
945
 
898
946
class TestTransportTrace(tests.TestCase):
899
947
 
900
 
    def test_get(self):
901
 
        t = transport.get_transport('trace+memory://')
902
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
948
    def test_decorator(self):
 
949
        t = transport.get_transport_from_url('trace+memory://')
 
950
        self.assertIsInstance(
 
951
            t, bzrlib.transport.trace.TransportTraceDecorator)
903
952
 
904
953
    def test_clone_preserves_activity(self):
905
 
        t = transport.get_transport('trace+memory://')
 
954
        t = transport.get_transport_from_url('trace+memory://')
906
955
        t2 = t.clone('.')
907
956
        self.assertTrue(t is not t2)
908
957
        self.assertTrue(t._activity is t2._activity)
912
961
    # still won't cause a test failure when the top level Transport API
913
962
    # changes; so there is little return doing that.
914
963
    def test_get(self):
915
 
        t = transport.get_transport('trace+memory:///')
 
964
        t = transport.get_transport_from_url('trace+memory:///')
916
965
        t.put_bytes('foo', 'barish')
917
966
        t.get('foo')
918
967
        expected_result = []
924
973
        self.assertEqual(expected_result, t._activity)
925
974
 
926
975
    def test_readv(self):
927
 
        t = transport.get_transport('trace+memory:///')
 
976
        t = transport.get_transport_from_url('trace+memory:///')
928
977
        t.put_bytes('foo', 'barish')
929
978
        list(t.readv('foo', [(0, 1), (3, 2)],
930
979
                     adjust_for_latency=True, upper_limit=6))
940
989
class TestSSHConnections(tests.TestCaseWithTransport):
941
990
 
942
991
    def test_bzr_connect_to_bzr_ssh(self):
943
 
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
992
        """get_transport of a bzr+ssh:// behaves correctly.
944
993
 
945
994
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
946
995
        """
962
1011
        # SSH channel ourselves.  Surely this has already been implemented
963
1012
        # elsewhere?
964
1013
        started = []
 
1014
 
965
1015
        class StubSSHServer(stub_sftp.StubServer):
966
1016
 
967
1017
            test = self
973
1023
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
974
1024
 
975
1025
                # XXX: horribly inefficient, not to mention ugly.
976
 
                # Start a thread for each of stdin/out/err, and relay bytes from
977
 
                # the subprocess to channel and vice versa.
 
1026
                # Start a thread for each of stdin/out/err, and relay bytes
 
1027
                # from the subprocess to channel and vice versa.
978
1028
                def ferry_bytes(read, write, close):
979
1029
                    while True:
980
1030
                        bytes = read(1)
1046
1096
    def test_truncation(self):
1047
1097
        fake_html = "<p>something!\n" * 1000
1048
1098
        result = http.unhtml_roughly(fake_html)
1049
 
        self.assertEquals(len(result), 1000)
 
1099
        self.assertEqual(len(result), 1000)
1050
1100
        self.assertStartsWith(result, " something!")
 
1101
 
 
1102
 
 
1103
class SomeDirectory(object):
 
1104
 
 
1105
    def look_up(self, name, url):
 
1106
        return "http://bar"
 
1107
 
 
1108
 
 
1109
class TestLocationToUrl(tests.TestCase):
 
1110
 
 
1111
    def get_base_location(self):
 
1112
        path = osutils.abspath('/foo/bar')
 
1113
        if path.startswith('/'):
 
1114
            url = 'file://%s' % (path,)
 
1115
        else:
 
1116
            # On Windows, abspaths start with the drive letter, so we have to
 
1117
            # add in the extra '/'
 
1118
            url = 'file:///%s' % (path,)
 
1119
        return path, url
 
1120
 
 
1121
    def test_regular_url(self):
 
1122
        self.assertEqual("file://foo", location_to_url("file://foo"))
 
1123
 
 
1124
    def test_directory(self):
 
1125
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1126
        self.addCleanup(directories.remove, "bar:")
 
1127
        self.assertEqual("http://bar", location_to_url("bar:"))
 
1128
 
 
1129
    def test_unicode_url(self):
 
1130
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1131
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1132
 
 
1133
    def test_unicode_path(self):
 
1134
        path, url = self.get_base_location()
 
1135
        location = path + "\xc3\xaf".decode("utf-8")
 
1136
        url += '%C3%AF'
 
1137
        self.assertEqual(url, location_to_url(location))
 
1138
 
 
1139
    def test_path(self):
 
1140
        path, url = self.get_base_location()
 
1141
        self.assertEqual(url, location_to_url(path))
 
1142
 
 
1143
    def test_relative_file_url(self):
 
1144
        self.assertEqual(urlutils.local_path_to_url(".") + "/bar",
 
1145
            location_to_url("file:bar"))
 
1146
 
 
1147
    def test_absolute_file_url(self):
 
1148
        self.assertEqual("file:///bar", location_to_url("file:/bar"))