~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Aaron Bentley
  • Date: 2007-02-06 14:52:16 UTC
  • mfrom: (2266 +trunk)
  • mto: This revision was merged to the branch mainline in revision 2268.
  • Revision ID: abentley@panoramicfeedback.com-20070206145216-fcpi8o3ufvuzwbp9
Merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
 
18
import os
 
19
import sys
 
20
import stat
18
21
from cStringIO import StringIO
19
22
 
20
23
import bzrlib
21
 
from bzrlib import (
22
 
    errors,
23
 
    osutils,
24
 
    urlutils,
25
 
    )
26
 
from bzrlib.errors import (DependencyNotPresent,
27
 
                           FileExists,
28
 
                           InvalidURLJoin,
29
 
                           NoSuchFile,
 
24
from bzrlib import urlutils
 
25
from bzrlib.errors import (NoSuchFile, FileExists,
 
26
                           TransportNotPossible,
 
27
                           ConnectionError,
 
28
                           DependencyNotPresent,
 
29
                           UnsupportedProtocol,
30
30
                           PathNotChild,
31
 
                           ReadError,
32
 
                           UnsupportedProtocol,
33
31
                           )
34
32
from bzrlib.tests import TestCase, TestCaseInTempDir
35
 
from bzrlib.transport import (_clear_protocol_handlers,
36
 
                              _CoalescedOffset,
37
 
                              ConnectedTransport,
 
33
from bzrlib.transport import (_CoalescedOffset,
38
34
                              _get_protocol_handlers,
39
 
                              _set_protocol_handlers,
40
35
                              _get_transport_modules,
41
36
                              get_transport,
42
 
                              LateReadError,
43
37
                              register_lazy_transport,
44
 
                              register_transport_proto,
 
38
                              _set_protocol_handlers,
45
39
                              Transport,
46
40
                              )
47
 
from bzrlib.transport.chroot import ChrootServer
48
41
from bzrlib.transport.memory import MemoryTransport
49
42
from bzrlib.transport.local import (LocalTransport,
50
43
                                    EmulatedWin32LocalTransport)
58
51
 
59
52
    def test__get_set_protocol_handlers(self):
60
53
        handlers = _get_protocol_handlers()
61
 
        self.assertNotEqual([], handlers.keys( ))
 
54
        self.assertNotEqual({}, handlers)
62
55
        try:
63
 
            _clear_protocol_handlers()
64
 
            self.assertEqual([], _get_protocol_handlers().keys())
 
56
            _set_protocol_handlers({})
 
57
            self.assertEqual({}, _get_protocol_handlers())
65
58
        finally:
66
59
            _set_protocol_handlers(handlers)
67
60
 
68
61
    def test_get_transport_modules(self):
69
62
        handlers = _get_protocol_handlers()
70
 
        # don't pollute the current handlers
71
 
        _clear_protocol_handlers()
72
63
        class SampleHandler(object):
73
64
            """I exist, isnt that enough?"""
74
65
        try:
75
 
            _clear_protocol_handlers()
76
 
            register_transport_proto('foo')
77
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
78
 
                                    'TestTransport.SampleHandler')
79
 
            register_transport_proto('bar')
80
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
 
                                    'TestTransport.SampleHandler')
82
 
            self.assertEqual([SampleHandler.__module__,
83
 
                              'bzrlib.transport.chroot'],
 
66
            my_handlers = {}
 
67
            _set_protocol_handlers(my_handlers)
 
68
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
69
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
70
            self.assertEqual([SampleHandler.__module__],
84
71
                             _get_transport_modules())
85
72
        finally:
86
73
            _set_protocol_handlers(handlers)
88
75
    def test_transport_dependency(self):
89
76
        """Transport with missing dependency causes no error"""
90
77
        saved_handlers = _get_protocol_handlers()
91
 
        # don't pollute the current handlers
92
 
        _clear_protocol_handlers()
93
78
        try:
94
 
            register_transport_proto('foo')
95
79
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
96
80
                    'BadTransportHandler')
97
81
            try:
112
96
        """Transport with missing dependency causes no error"""
113
97
        saved_handlers = _get_protocol_handlers()
114
98
        try:
115
 
            _clear_protocol_handlers()
116
 
            register_transport_proto('foo')
 
99
            _set_protocol_handlers({})
117
100
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
118
101
                    'BackupTransportHandler')
119
102
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
123
106
        finally:
124
107
            _set_protocol_handlers(saved_handlers)
125
108
 
126
 
    def test_LateReadError(self):
127
 
        """The LateReadError helper should raise on read()."""
128
 
        a_file = LateReadError('a path')
129
 
        try:
130
 
            a_file.read()
131
 
        except ReadError, error:
132
 
            self.assertEqual('a path', error.path)
133
 
        self.assertRaises(ReadError, a_file.read, 40)
134
 
        a_file.close()
135
 
 
136
109
    def test__combine_paths(self):
137
110
        t = Transport('/')
138
111
        self.assertEqual('/home/sarah/project/foo',
144
117
        self.assertEqual('/etc',
145
118
                         t._combine_paths('/home/sarah', '/etc'))
146
119
 
147
 
    def test_local_abspath_non_local_transport(self):
148
 
        # the base implementation should throw
149
 
        t = MemoryTransport()
150
 
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
151
 
        self.assertEqual('memory:///t is not a local path.', str(e))
152
 
 
153
120
 
154
121
class TestCoalesceOffsets(TestCase):
155
 
 
156
 
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
 
122
    
 
123
    def check(self, expected, offsets, limit=0, fudge=0):
157
124
        coalesce = Transport._coalesce_offsets
158
125
        exp = [_CoalescedOffset(*x) for x in expected]
159
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
160
 
                            max_size=max_size))
 
126
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
161
127
        self.assertEqual(exp, out)
162
128
 
163
129
    def test_coalesce_empty(self):
170
136
        self.check([(0, 10, [(0, 10)]),
171
137
                    (20, 10, [(0, 10)]),
172
138
                   ], [(0, 10), (20, 10)])
173
 
 
 
139
            
174
140
    def test_coalesce_unsorted(self):
175
141
        self.check([(20, 10, [(0, 10)]),
176
142
                    (0, 10, [(0, 10)]),
180
146
        self.check([(0, 20, [(0, 10), (10, 10)])],
181
147
                   [(0, 10), (10, 10)])
182
148
 
183
 
    # XXX: scary, http.readv() can't handle that --vila20071209
184
149
    def test_coalesce_overlapped(self):
185
150
        self.check([(0, 15, [(0, 10), (5, 10)])],
186
151
                   [(0, 10), (5, 10)])
210
175
                   ], [(10, 10), (30, 10), (100, 10)],
211
176
                   fudge=10
212
177
                  )
213
 
    def test_coalesce_max_size(self):
214
 
        self.check([(10, 20, [(0, 10), (10, 10)]),
215
 
                    (30, 50, [(0, 50)]),
216
 
                    # If one range is above max_size, it gets its own coalesced
217
 
                    # offset
218
 
                    (100, 80, [(0, 80),]),],
219
 
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
220
 
                   max_size=50
221
 
                  )
222
 
 
223
 
    def test_coalesce_no_max_size(self):
224
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
225
 
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
226
 
                  )
227
178
 
228
179
 
229
180
class TestMemoryTransport(TestCase):
315
266
    def test_parameters(self):
316
267
        transport = MemoryTransport()
317
268
        self.assertEqual(True, transport.listable())
 
269
        self.assertEqual(False, transport.should_cache())
318
270
        self.assertEqual(False, transport.is_readonly())
319
271
 
320
272
    def test_iter_files_recursive(self):
337
289
class ChrootDecoratorTransportTest(TestCase):
338
290
    """Chroot decoration specific tests."""
339
291
 
340
 
    def test_abspath(self):
341
 
        # The abspath is always relative to the chroot_url.
342
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
343
 
        server.setUp()
344
 
        transport = get_transport(server.get_url())
345
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
346
 
 
347
 
        subdir_transport = transport.clone('subdir')
348
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
349
 
        server.tearDown()
350
 
 
351
 
    def test_clone(self):
352
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
353
 
        server.setUp()
354
 
        transport = get_transport(server.get_url())
355
 
        # relpath from root and root path are the same
356
 
        relpath_cloned = transport.clone('foo')
357
 
        abspath_cloned = transport.clone('/foo')
358
 
        self.assertEqual(server, relpath_cloned.server)
359
 
        self.assertEqual(server, abspath_cloned.server)
360
 
        server.tearDown()
361
 
    
362
 
    def test_chroot_url_preserves_chroot(self):
363
 
        """Calling get_transport on a chroot transport's base should produce a
364
 
        transport with exactly the same behaviour as the original chroot
365
 
        transport.
366
 
 
367
 
        This is so that it is not possible to escape a chroot by doing::
368
 
            url = chroot_transport.base
369
 
            parent_url = urlutils.join(url, '..')
370
 
            new_transport = get_transport(parent_url)
371
 
        """
372
 
        server = ChrootServer(get_transport('memory:///path/subpath'))
373
 
        server.setUp()
374
 
        transport = get_transport(server.get_url())
375
 
        new_transport = get_transport(transport.base)
376
 
        self.assertEqual(transport.server, new_transport.server)
377
 
        self.assertEqual(transport.base, new_transport.base)
378
 
        server.tearDown()
379
 
        
380
 
    def test_urljoin_preserves_chroot(self):
381
 
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
382
 
        URL that escapes the intended chroot.
383
 
 
384
 
        This is so that it is not possible to escape a chroot by doing::
385
 
            url = chroot_transport.base
386
 
            parent_url = urlutils.join(url, '..')
387
 
            new_transport = get_transport(parent_url)
388
 
        """
389
 
        server = ChrootServer(get_transport('memory:///path/'))
390
 
        server.setUp()
391
 
        transport = get_transport(server.get_url())
392
 
        self.assertRaises(
393
 
            InvalidURLJoin, urlutils.join, transport.base, '..')
394
 
        server.tearDown()
395
 
 
396
 
 
397
 
class ChrootServerTest(TestCase):
398
 
 
399
292
    def test_construct(self):
400
 
        backing_transport = MemoryTransport()
401
 
        server = ChrootServer(backing_transport)
402
 
        self.assertEqual(backing_transport, server.backing_transport)
403
 
 
404
 
    def test_setUp(self):
405
 
        backing_transport = MemoryTransport()
406
 
        server = ChrootServer(backing_transport)
407
 
        server.setUp()
408
 
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
409
 
 
410
 
    def test_tearDown(self):
411
 
        backing_transport = MemoryTransport()
412
 
        server = ChrootServer(backing_transport)
413
 
        server.setUp()
414
 
        server.tearDown()
415
 
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
416
 
 
417
 
    def test_get_url(self):
418
 
        backing_transport = MemoryTransport()
419
 
        server = ChrootServer(backing_transport)
420
 
        server.setUp()
421
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
422
 
        server.tearDown()
 
293
        from bzrlib.transport import chroot
 
294
        transport = chroot.ChrootTransportDecorator('chroot+memory:///pathA/')
 
295
        self.assertEqual('memory:///pathA/', transport.chroot_url)
 
296
 
 
297
        transport = chroot.ChrootTransportDecorator(
 
298
            'chroot+memory:///path/B', chroot='memory:///path/')
 
299
        self.assertEqual('memory:///path/', transport.chroot_url)
 
300
 
 
301
    def test_append_file(self):
 
302
        transport = get_transport('chroot+memory:///foo/bar')
 
303
        self.assertRaises(PathNotChild, transport.append_file, '/foo', None)
 
304
 
 
305
    def test_append_bytes(self):
 
306
        transport = get_transport('chroot+memory:///foo/bar')
 
307
        self.assertRaises(PathNotChild, transport.append_bytes, '/foo', 'bytes')
 
308
 
 
309
    def test_clone(self):
 
310
        transport = get_transport('chroot+memory:///foo/bar')
 
311
        self.assertRaises(PathNotChild, transport.clone, '/foo')
 
312
 
 
313
    def test_delete(self):
 
314
        transport = get_transport('chroot+memory:///foo/bar')
 
315
        self.assertRaises(PathNotChild, transport.delete, '/foo')
 
316
 
 
317
    def test_delete_tree(self):
 
318
        transport = get_transport('chroot+memory:///foo/bar')
 
319
        self.assertRaises(PathNotChild, transport.delete_tree, '/foo')
 
320
 
 
321
    def test_get(self):
 
322
        transport = get_transport('chroot+memory:///foo/bar')
 
323
        self.assertRaises(PathNotChild, transport.get, '/foo')
 
324
 
 
325
    def test_get_bytes(self):
 
326
        transport = get_transport('chroot+memory:///foo/bar')
 
327
        self.assertRaises(PathNotChild, transport.get_bytes, '/foo')
 
328
 
 
329
    def test_has(self):
 
330
        transport = get_transport('chroot+memory:///foo/bar')
 
331
        self.assertRaises(PathNotChild, transport.has, '/foo')
 
332
 
 
333
    def test_list_dir(self):
 
334
        transport = get_transport('chroot+memory:///foo/bar')
 
335
        self.assertRaises(PathNotChild, transport.list_dir, '/foo')
 
336
 
 
337
    def test_lock_read(self):
 
338
        transport = get_transport('chroot+memory:///foo/bar')
 
339
        self.assertRaises(PathNotChild, transport.lock_read, '/foo')
 
340
 
 
341
    def test_lock_write(self):
 
342
        transport = get_transport('chroot+memory:///foo/bar')
 
343
        self.assertRaises(PathNotChild, transport.lock_write, '/foo')
 
344
 
 
345
    def test_mkdir(self):
 
346
        transport = get_transport('chroot+memory:///foo/bar')
 
347
        self.assertRaises(PathNotChild, transport.mkdir, '/foo')
 
348
 
 
349
    def test_put_bytes(self):
 
350
        transport = get_transport('chroot+memory:///foo/bar')
 
351
        self.assertRaises(PathNotChild, transport.put_bytes, '/foo', 'bytes')
 
352
 
 
353
    def test_put_file(self):
 
354
        transport = get_transport('chroot+memory:///foo/bar')
 
355
        self.assertRaises(PathNotChild, transport.put_file, '/foo', None)
 
356
 
 
357
    def test_rename(self):
 
358
        transport = get_transport('chroot+memory:///foo/bar')
 
359
        self.assertRaises(PathNotChild, transport.rename, '/aaa', 'bbb')
 
360
        self.assertRaises(PathNotChild, transport.rename, 'ccc', '/d')
 
361
 
 
362
    def test_rmdir(self):
 
363
        transport = get_transport('chroot+memory:///foo/bar')
 
364
        self.assertRaises(PathNotChild, transport.rmdir, '/foo')
 
365
 
 
366
    def test_stat(self):
 
367
        transport = get_transport('chroot+memory:///foo/bar')
 
368
        self.assertRaises(PathNotChild, transport.stat, '/foo')
423
369
 
424
370
 
425
371
class ReadonlyDecoratorTransportTest(TestCase):
430
376
        # connect to . in readonly mode
431
377
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
432
378
        self.assertEqual(True, transport.listable())
 
379
        self.assertEqual(False, transport.should_cache())
433
380
        self.assertEqual(True, transport.is_readonly())
434
381
 
435
382
    def test_http_parameters(self):
436
 
        from bzrlib.tests.http_server import HttpServer
 
383
        from bzrlib.tests.HttpServer import HttpServer
437
384
        import bzrlib.transport.readonly as readonly
438
 
        # connect to '.' via http which is not listable
 
385
        # connect to . via http which is not listable
439
386
        server = HttpServer()
440
387
        server.setUp()
441
388
        try:
443
390
            self.failUnless(isinstance(transport,
444
391
                                       readonly.ReadonlyTransportDecorator))
445
392
            self.assertEqual(False, transport.listable())
 
393
            self.assertEqual(True, transport.should_cache())
446
394
            self.assertEqual(True, transport.is_readonly())
447
395
        finally:
448
396
            server.tearDown()
457
405
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
458
406
 
459
407
    def test_local_parameters(self):
460
 
        # the listable and is_readonly parameters
 
408
        # the listable, should_cache and is_readonly parameters
461
409
        # are not changed by the fakenfs decorator
462
410
        transport = self.get_nfs_transport('.')
463
411
        self.assertEqual(True, transport.listable())
 
412
        self.assertEqual(False, transport.should_cache())
464
413
        self.assertEqual(False, transport.is_readonly())
465
414
 
466
415
    def test_http_parameters(self):
467
 
        # the listable and is_readonly parameters
 
416
        # the listable, should_cache and is_readonly parameters
468
417
        # are not changed by the fakenfs decorator
469
 
        from bzrlib.tests.http_server import HttpServer
470
 
        # connect to '.' via http which is not listable
 
418
        from bzrlib.tests.HttpServer import HttpServer
 
419
        # connect to . via http which is not listable
471
420
        server = HttpServer()
472
421
        server.setUp()
473
422
        try:
475
424
            self.assertIsInstance(
476
425
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
477
426
            self.assertEqual(False, transport.listable())
 
427
            self.assertEqual(True, transport.should_cache())
478
428
            self.assertEqual(True, transport.is_readonly())
479
429
        finally:
480
430
            server.tearDown()
501
451
        transport = self.get_nfs_transport('.')
502
452
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
503
453
                        transport=transport)
504
 
        self.assertRaises(errors.ResourceBusy,
 
454
        self.assertRaises(bzrlib.errors.ResourceBusy,
505
455
                          transport.rename, 'from', 'to')
506
456
 
507
457
 
565
515
        super(TestTransportImplementation, self).setUp()
566
516
        self._server = self.transport_server()
567
517
        self._server.setUp()
568
 
        self.addCleanup(self._server.tearDown)
569
 
 
570
 
    def get_transport(self, relpath=None):
571
 
        """Return a connected transport to the local directory.
572
 
 
573
 
        :param relpath: a path relative to the base url.
574
 
        """
 
518
 
 
519
    def tearDown(self):
 
520
        super(TestTransportImplementation, self).tearDown()
 
521
        self._server.tearDown()
 
522
        
 
523
    def get_transport(self):
 
524
        """Return a connected transport to the local directory."""
575
525
        base_url = self._server.get_url()
576
 
        url = self._adjust_url(base_url, relpath)
577
526
        # try getting the transport via the regular interface:
578
 
        t = get_transport(url)
579
 
        # vila--20070607 if the following are commented out the test suite
580
 
        # still pass. Is this really still needed or was it a forgotten
581
 
        # temporary fix ?
 
527
        t = get_transport(base_url)
582
528
        if not isinstance(t, self.transport_class):
583
529
            # we did not get the correct transport class type. Override the
584
530
            # regular connection behaviour by direct construction.
585
 
            t = self.transport_class(url)
 
531
            t = self.transport_class(base_url)
586
532
        return t
587
533
 
588
534
 
589
535
class TestLocalTransports(TestCase):
590
536
 
591
537
    def test_get_transport_from_abspath(self):
592
 
        here = osutils.abspath('.')
 
538
        here = os.path.abspath('.')
593
539
        t = get_transport(here)
594
540
        self.assertIsInstance(t, LocalTransport)
595
541
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
596
542
 
597
543
    def test_get_transport_from_relpath(self):
598
 
        here = osutils.abspath('.')
 
544
        here = os.path.abspath('.')
599
545
        t = get_transport('.')
600
546
        self.assertIsInstance(t, LocalTransport)
601
547
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
602
548
 
603
549
    def test_get_transport_from_local_url(self):
604
 
        here = osutils.abspath('.')
 
550
        here = os.path.abspath('.')
605
551
        here_url = urlutils.local_path_to_url(here) + '/'
606
552
        t = get_transport(here_url)
607
553
        self.assertIsInstance(t, LocalTransport)
608
554
        self.assertEquals(t.base, here_url)
609
555
 
610
 
    def test_local_abspath(self):
611
 
        here = osutils.abspath('.')
612
 
        t = get_transport(here)
613
 
        self.assertEquals(t.local_abspath(''), here)
614
 
 
615
556
 
616
557
class TestWin32LocalTransport(TestCase):
617
558
 
626
567
        # make sure we reach the root
627
568
        t = t.clone('..')
628
569
        self.assertEquals(t.base, 'file://HOST/')
629
 
 
630
 
 
631
 
class TestConnectedTransport(TestCase):
632
 
    """Tests for connected to remote server transports"""
633
 
 
634
 
    def test_parse_url(self):
635
 
        t = ConnectedTransport('http://simple.example.com/home/source')
636
 
        self.assertEquals(t._host, 'simple.example.com')
637
 
        self.assertEquals(t._port, None)
638
 
        self.assertEquals(t._path, '/home/source/')
639
 
        self.failUnless(t._user is None)
640
 
        self.failUnless(t._password is None)
641
 
 
642
 
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
643
 
 
644
 
    def test_parse_url_with_at_in_user(self):
645
 
        # Bug 228058
646
 
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
647
 
        self.assertEquals(t._user, 'user@host.com')
648
 
 
649
 
    def test_parse_quoted_url(self):
650
 
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
651
 
        self.assertEquals(t._host, 'exAmple.com')
652
 
        self.assertEquals(t._port, 2222)
653
 
        self.assertEquals(t._user, 'robey')
654
 
        self.assertEquals(t._password, 'h@t')
655
 
        self.assertEquals(t._path, '/path/')
656
 
 
657
 
        # Base should not keep track of the password
658
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
659
 
 
660
 
    def test_parse_invalid_url(self):
661
 
        self.assertRaises(errors.InvalidURL,
662
 
                          ConnectedTransport,
663
 
                          'sftp://lily.org:~janneke/public/bzr/gub')
664
 
 
665
 
    def test_relpath(self):
666
 
        t = ConnectedTransport('sftp://user@host.com/abs/path')
667
 
 
668
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
669
 
        self.assertRaises(errors.PathNotChild, t.relpath,
670
 
                          'http://user@host.com/abs/path/sub')
671
 
        self.assertRaises(errors.PathNotChild, t.relpath,
672
 
                          'sftp://user2@host.com/abs/path/sub')
673
 
        self.assertRaises(errors.PathNotChild, t.relpath,
674
 
                          'sftp://user@otherhost.com/abs/path/sub')
675
 
        self.assertRaises(errors.PathNotChild, t.relpath,
676
 
                          'sftp://user@host.com:33/abs/path/sub')
677
 
        # Make sure it works when we don't supply a username
678
 
        t = ConnectedTransport('sftp://host.com/abs/path')
679
 
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
680
 
 
681
 
        # Make sure it works when parts of the path will be url encoded
682
 
        t = ConnectedTransport('sftp://host.com/dev/%path')
683
 
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
684
 
 
685
 
    def test_connection_sharing_propagate_credentials(self):
686
 
        t = ConnectedTransport('ftp://user@host.com/abs/path')
687
 
        self.assertEquals('user', t._user)
688
 
        self.assertEquals('host.com', t._host)
689
 
        self.assertIs(None, t._get_connection())
690
 
        self.assertIs(None, t._password)
691
 
        c = t.clone('subdir')
692
 
        self.assertIs(None, c._get_connection())
693
 
        self.assertIs(None, t._password)
694
 
 
695
 
        # Simulate the user entering a password
696
 
        password = 'secret'
697
 
        connection = object()
698
 
        t._set_connection(connection, password)
699
 
        self.assertIs(connection, t._get_connection())
700
 
        self.assertIs(password, t._get_credentials())
701
 
        self.assertIs(connection, c._get_connection())
702
 
        self.assertIs(password, c._get_credentials())
703
 
 
704
 
        # credentials can be updated
705
 
        new_password = 'even more secret'
706
 
        c._update_credentials(new_password)
707
 
        self.assertIs(connection, t._get_connection())
708
 
        self.assertIs(new_password, t._get_credentials())
709
 
        self.assertIs(connection, c._get_connection())
710
 
        self.assertIs(new_password, c._get_credentials())
711
 
 
712
 
 
713
 
class TestReusedTransports(TestCase):
714
 
    """Tests for transport reuse"""
715
 
 
716
 
    def test_reuse_same_transport(self):
717
 
        possible_transports = []
718
 
        t1 = get_transport('http://foo/',
719
 
                           possible_transports=possible_transports)
720
 
        self.assertEqual([t1], possible_transports)
721
 
        t2 = get_transport('http://foo/', possible_transports=[t1])
722
 
        self.assertIs(t1, t2)
723
 
 
724
 
        # Also check that final '/' are handled correctly
725
 
        t3 = get_transport('http://foo/path/')
726
 
        t4 = get_transport('http://foo/path', possible_transports=[t3])
727
 
        self.assertIs(t3, t4)
728
 
 
729
 
        t5 = get_transport('http://foo/path')
730
 
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
731
 
        self.assertIs(t5, t6)
732
 
 
733
 
    def test_don_t_reuse_different_transport(self):
734
 
        t1 = get_transport('http://foo/path')
735
 
        t2 = get_transport('http://bar/path', possible_transports=[t1])
736
 
        self.assertIsNot(t1, t2)
737
 
 
738
 
 
739
 
class TestTransportTrace(TestCase):
740
 
 
741
 
    def test_get(self):
742
 
        transport = get_transport('trace+memory://')
743
 
        self.assertIsInstance(
744
 
            transport, bzrlib.transport.trace.TransportTraceDecorator)
745
 
 
746
 
    def test_clone_preserves_activity(self):
747
 
        transport = get_transport('trace+memory://')
748
 
        transport2 = transport.clone('.')
749
 
        self.assertTrue(transport is not transport2)
750
 
        self.assertTrue(transport._activity is transport2._activity)
751
 
 
752
 
    # the following specific tests are for the operations that have made use of
753
 
    # logging in tests; we could test every single operation but doing that
754
 
    # still won't cause a test failure when the top level Transport API
755
 
    # changes; so there is little return doing that.
756
 
    def test_get(self):
757
 
        transport = get_transport('trace+memory:///')
758
 
        transport.put_bytes('foo', 'barish')
759
 
        transport.get('foo')
760
 
        expected_result = []
761
 
        # put_bytes records the bytes, not the content to avoid memory
762
 
        # pressure.
763
 
        expected_result.append(('put_bytes', 'foo', 6, None))
764
 
        # get records the file name only.
765
 
        expected_result.append(('get', 'foo'))
766
 
        self.assertEqual(expected_result, transport._activity)
767
 
 
768
 
    def test_readv(self):
769
 
        transport = get_transport('trace+memory:///')
770
 
        transport.put_bytes('foo', 'barish')
771
 
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
772
 
            upper_limit=6))
773
 
        expected_result = []
774
 
        # put_bytes records the bytes, not the content to avoid memory
775
 
        # pressure.
776
 
        expected_result.append(('put_bytes', 'foo', 6, None))
777
 
        # readv records the supplied offset request
778
 
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
779
 
        self.assertEqual(expected_result, transport._activity)