~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Martin Pool
  • Date: 2007-04-04 06:17:31 UTC
  • mto: This revision was merged to the branch mainline in revision 2397.
  • Revision ID: mbp@sourcefrog.net-20070404061731-tt2xrzllqhbodn83
Contents of TODO file moved into bug tracker

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
 
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
import os
 
19
import sys
 
20
import stat
18
21
from cStringIO import StringIO
19
22
 
20
23
import bzrlib
21
 
from bzrlib import (
22
 
    errors,
23
 
    osutils,
24
 
    urlutils,
25
 
    )
26
 
from bzrlib.errors import (DependencyNotPresent,
 
24
from bzrlib import urlutils
 
25
from bzrlib.errors import (ConnectionError,
 
26
                           DependencyNotPresent,
27
27
                           FileExists,
28
28
                           InvalidURLJoin,
29
29
                           NoSuchFile,
30
30
                           PathNotChild,
31
 
                           ReadError,
 
31
                           TransportNotPossible,
32
32
                           UnsupportedProtocol,
33
33
                           )
34
34
from bzrlib.tests import TestCase, TestCaseInTempDir
35
 
from bzrlib.transport import (_clear_protocol_handlers,
36
 
                              _CoalescedOffset,
37
 
                              ConnectedTransport,
 
35
from bzrlib.transport import (_CoalescedOffset,
38
36
                              _get_protocol_handlers,
39
 
                              _set_protocol_handlers,
40
37
                              _get_transport_modules,
41
38
                              get_transport,
42
 
                              LateReadError,
 
39
                              _protocol_handlers,
43
40
                              register_lazy_transport,
44
 
                              register_transport_proto,
 
41
                              _set_protocol_handlers,
45
42
                              Transport,
46
43
                              )
47
44
from bzrlib.transport.chroot import ChrootServer
58
55
 
59
56
    def test__get_set_protocol_handlers(self):
60
57
        handlers = _get_protocol_handlers()
61
 
        self.assertNotEqual([], handlers.keys( ))
 
58
        self.assertNotEqual({}, handlers)
62
59
        try:
63
 
            _clear_protocol_handlers()
64
 
            self.assertEqual([], _get_protocol_handlers().keys())
 
60
            _set_protocol_handlers({})
 
61
            self.assertEqual({}, _get_protocol_handlers())
65
62
        finally:
66
63
            _set_protocol_handlers(handlers)
67
64
 
68
65
    def test_get_transport_modules(self):
69
66
        handlers = _get_protocol_handlers()
70
 
        # don't pollute the current handlers
71
 
        _clear_protocol_handlers()
72
67
        class SampleHandler(object):
73
68
            """I exist, isnt that enough?"""
74
69
        try:
75
 
            _clear_protocol_handlers()
76
 
            register_transport_proto('foo')
77
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
78
 
                                    'TestTransport.SampleHandler')
79
 
            register_transport_proto('bar')
80
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
 
                                    'TestTransport.SampleHandler')
82
 
            self.assertEqual([SampleHandler.__module__,
83
 
                              'bzrlib.transport.chroot'],
 
70
            my_handlers = {}
 
71
            _set_protocol_handlers(my_handlers)
 
72
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
73
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
74
            self.assertEqual([SampleHandler.__module__, 'bzrlib.transport.chroot'],
84
75
                             _get_transport_modules())
85
76
        finally:
86
77
            _set_protocol_handlers(handlers)
88
79
    def test_transport_dependency(self):
89
80
        """Transport with missing dependency causes no error"""
90
81
        saved_handlers = _get_protocol_handlers()
91
 
        # don't pollute the current handlers
92
 
        _clear_protocol_handlers()
93
82
        try:
94
 
            register_transport_proto('foo')
95
83
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
96
84
                    'BadTransportHandler')
97
85
            try:
107
95
        finally:
108
96
            # restore original values
109
97
            _set_protocol_handlers(saved_handlers)
110
 
 
 
98
            
111
99
    def test_transport_fallback(self):
112
100
        """Transport with missing dependency causes no error"""
113
101
        saved_handlers = _get_protocol_handlers()
114
102
        try:
115
 
            _clear_protocol_handlers()
116
 
            register_transport_proto('foo')
 
103
            _set_protocol_handlers({})
117
104
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
118
105
                    'BackupTransportHandler')
119
106
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
123
110
        finally:
124
111
            _set_protocol_handlers(saved_handlers)
125
112
 
126
 
    def test_ssh_hints(self):
127
 
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
128
 
        try:
129
 
            get_transport('ssh://fooserver/foo')
130
 
        except UnsupportedProtocol, e:
131
 
            e_str = str(e)
132
 
            self.assertEquals('Unsupported protocol'
133
 
                              ' for url "ssh://fooserver/foo":'
134
 
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
135
 
                              str(e))
136
 
        else:
137
 
            self.fail('Did not raise UnsupportedProtocol')
138
 
 
139
 
    def test_LateReadError(self):
140
 
        """The LateReadError helper should raise on read()."""
141
 
        a_file = LateReadError('a path')
142
 
        try:
143
 
            a_file.read()
144
 
        except ReadError, error:
145
 
            self.assertEqual('a path', error.path)
146
 
        self.assertRaises(ReadError, a_file.read, 40)
147
 
        a_file.close()
148
 
 
149
113
    def test__combine_paths(self):
150
114
        t = Transport('/')
151
115
        self.assertEqual('/home/sarah/project/foo',
157
121
        self.assertEqual('/etc',
158
122
                         t._combine_paths('/home/sarah', '/etc'))
159
123
 
160
 
    def test_local_abspath_non_local_transport(self):
161
 
        # the base implementation should throw
162
 
        t = MemoryTransport()
163
 
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
164
 
        self.assertEqual('memory:///t is not a local path.', str(e))
165
 
 
166
124
 
167
125
class TestCoalesceOffsets(TestCase):
168
 
 
169
 
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
 
126
    
 
127
    def check(self, expected, offsets, limit=0, fudge=0):
170
128
        coalesce = Transport._coalesce_offsets
171
129
        exp = [_CoalescedOffset(*x) for x in expected]
172
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
173
 
                            max_size=max_size))
 
130
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
174
131
        self.assertEqual(exp, out)
175
132
 
176
133
    def test_coalesce_empty(self):
183
140
        self.check([(0, 10, [(0, 10)]),
184
141
                    (20, 10, [(0, 10)]),
185
142
                   ], [(0, 10), (20, 10)])
186
 
 
 
143
            
187
144
    def test_coalesce_unsorted(self):
188
145
        self.check([(20, 10, [(0, 10)]),
189
146
                    (0, 10, [(0, 10)]),
194
151
                   [(0, 10), (10, 10)])
195
152
 
196
153
    def test_coalesce_overlapped(self):
197
 
        self.assertRaises(ValueError,
198
 
            self.check, [(0, 15, [(0, 10), (5, 10)])],
199
 
                        [(0, 10), (5, 10)])
 
154
        self.check([(0, 15, [(0, 10), (5, 10)])],
 
155
                   [(0, 10), (5, 10)])
200
156
 
201
157
    def test_coalesce_limit(self):
202
158
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
223
179
                   ], [(10, 10), (30, 10), (100, 10)],
224
180
                   fudge=10
225
181
                  )
226
 
    def test_coalesce_max_size(self):
227
 
        self.check([(10, 20, [(0, 10), (10, 10)]),
228
 
                    (30, 50, [(0, 50)]),
229
 
                    # If one range is above max_size, it gets its own coalesced
230
 
                    # offset
231
 
                    (100, 80, [(0, 80),]),],
232
 
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
233
 
                   max_size=50
234
 
                  )
235
 
 
236
 
    def test_coalesce_no_max_size(self):
237
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
238
 
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
239
 
                  )
240
 
 
241
 
    def test_coalesce_default_limit(self):
242
 
        # By default we use a 100MB max size.
243
 
        ten_mb = 10*1024*1024
244
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
245
 
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
246
 
                   [(i*ten_mb, ten_mb) for i in range(11)])
247
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
248
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
249
 
                   max_size=1*1024*1024*1024)
250
182
 
251
183
 
252
184
class TestMemoryTransport(TestCase):
338
270
    def test_parameters(self):
339
271
        transport = MemoryTransport()
340
272
        self.assertEqual(True, transport.listable())
 
273
        self.assertEqual(False, transport.should_cache())
341
274
        self.assertEqual(False, transport.is_readonly())
342
275
 
343
276
    def test_iter_files_recursive(self):
381
314
        self.assertEqual(server, relpath_cloned.server)
382
315
        self.assertEqual(server, abspath_cloned.server)
383
316
        server.tearDown()
384
 
 
 
317
    
385
318
    def test_chroot_url_preserves_chroot(self):
386
319
        """Calling get_transport on a chroot transport's base should produce a
387
320
        transport with exactly the same behaviour as the original chroot
399
332
        self.assertEqual(transport.server, new_transport.server)
400
333
        self.assertEqual(transport.base, new_transport.base)
401
334
        server.tearDown()
402
 
 
 
335
        
403
336
    def test_urljoin_preserves_chroot(self):
404
337
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
405
338
        URL that escapes the intended chroot.
428
361
        backing_transport = MemoryTransport()
429
362
        server = ChrootServer(backing_transport)
430
363
        server.setUp()
431
 
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
364
        self.assertTrue(server.scheme in _protocol_handlers.keys())
432
365
 
433
366
    def test_tearDown(self):
434
367
        backing_transport = MemoryTransport()
435
368
        server = ChrootServer(backing_transport)
436
369
        server.setUp()
437
370
        server.tearDown()
438
 
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
371
        self.assertFalse(server.scheme in _protocol_handlers.keys())
439
372
 
440
373
    def test_get_url(self):
441
374
        backing_transport = MemoryTransport()
453
386
        # connect to . in readonly mode
454
387
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
455
388
        self.assertEqual(True, transport.listable())
 
389
        self.assertEqual(False, transport.should_cache())
456
390
        self.assertEqual(True, transport.is_readonly())
457
391
 
458
392
    def test_http_parameters(self):
459
 
        from bzrlib.tests.http_server import HttpServer
 
393
        from bzrlib.tests.HttpServer import HttpServer
460
394
        import bzrlib.transport.readonly as readonly
461
 
        # connect to '.' via http which is not listable
 
395
        # connect to . via http which is not listable
462
396
        server = HttpServer()
463
397
        server.setUp()
464
398
        try:
466
400
            self.failUnless(isinstance(transport,
467
401
                                       readonly.ReadonlyTransportDecorator))
468
402
            self.assertEqual(False, transport.listable())
 
403
            self.assertEqual(True, transport.should_cache())
469
404
            self.assertEqual(True, transport.is_readonly())
470
405
        finally:
471
406
            server.tearDown()
480
415
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
481
416
 
482
417
    def test_local_parameters(self):
483
 
        # the listable and is_readonly parameters
 
418
        # the listable, should_cache and is_readonly parameters
484
419
        # are not changed by the fakenfs decorator
485
420
        transport = self.get_nfs_transport('.')
486
421
        self.assertEqual(True, transport.listable())
 
422
        self.assertEqual(False, transport.should_cache())
487
423
        self.assertEqual(False, transport.is_readonly())
488
424
 
489
425
    def test_http_parameters(self):
490
 
        # the listable and is_readonly parameters
 
426
        # the listable, should_cache and is_readonly parameters
491
427
        # are not changed by the fakenfs decorator
492
 
        from bzrlib.tests.http_server import HttpServer
493
 
        # connect to '.' via http which is not listable
 
428
        from bzrlib.tests.HttpServer import HttpServer
 
429
        # connect to . via http which is not listable
494
430
        server = HttpServer()
495
431
        server.setUp()
496
432
        try:
498
434
            self.assertIsInstance(
499
435
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
500
436
            self.assertEqual(False, transport.listable())
 
437
            self.assertEqual(True, transport.should_cache())
501
438
            self.assertEqual(True, transport.is_readonly())
502
439
        finally:
503
440
            server.tearDown()
524
461
        transport = self.get_nfs_transport('.')
525
462
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
526
463
                        transport=transport)
527
 
        self.assertRaises(errors.ResourceBusy,
 
464
        self.assertRaises(bzrlib.errors.ResourceBusy,
528
465
                          transport.rename, 'from', 'to')
529
466
 
530
467
 
564
501
 
565
502
class TestTransportImplementation(TestCaseInTempDir):
566
503
    """Implementation verification for transports.
567
 
 
 
504
    
568
505
    To verify a transport we need a server factory, which is a callable
569
506
    that accepts no parameters and returns an implementation of
570
507
    bzrlib.transport.Server.
571
 
 
 
508
    
572
509
    That Server is then used to construct transport instances and test
573
510
    the transport via loopback activity.
574
511
 
575
 
    Currently this assumes that the Transport object is connected to the
576
 
    current working directory.  So that whatever is done
577
 
    through the transport, should show up in the working
 
512
    Currently this assumes that the Transport object is connected to the 
 
513
    current working directory.  So that whatever is done 
 
514
    through the transport, should show up in the working 
578
515
    directory, and vice-versa. This is a bug, because its possible to have
579
 
    URL schemes which provide access to something that may not be
580
 
    result in storage on the local disk, i.e. due to file system limits, or
 
516
    URL schemes which provide access to something that may not be 
 
517
    result in storage on the local disk, i.e. due to file system limits, or 
581
518
    due to it being a database or some other non-filesystem tool.
582
519
 
583
520
    This also tests to make sure that the functions work with both
584
521
    generators and lists (assuming iter(list) is effectively a generator)
585
522
    """
586
 
 
 
523
    
587
524
    def setUp(self):
588
525
        super(TestTransportImplementation, self).setUp()
589
526
        self._server = self.transport_server()
590
527
        self._server.setUp()
591
528
        self.addCleanup(self._server.tearDown)
592
529
 
593
 
    def get_transport(self, relpath=None):
594
 
        """Return a connected transport to the local directory.
595
 
 
596
 
        :param relpath: a path relative to the base url.
597
 
        """
 
530
    def get_transport(self):
 
531
        """Return a connected transport to the local directory."""
598
532
        base_url = self._server.get_url()
599
 
        url = self._adjust_url(base_url, relpath)
600
533
        # try getting the transport via the regular interface:
601
 
        t = get_transport(url)
602
 
        # vila--20070607 if the following are commented out the test suite
603
 
        # still pass. Is this really still needed or was it a forgotten
604
 
        # temporary fix ?
 
534
        t = get_transport(base_url)
605
535
        if not isinstance(t, self.transport_class):
606
536
            # we did not get the correct transport class type. Override the
607
537
            # regular connection behaviour by direct construction.
608
 
            t = self.transport_class(url)
 
538
            t = self.transport_class(base_url)
609
539
        return t
610
540
 
611
541
 
612
542
class TestLocalTransports(TestCase):
613
543
 
614
544
    def test_get_transport_from_abspath(self):
615
 
        here = osutils.abspath('.')
 
545
        here = os.path.abspath('.')
616
546
        t = get_transport(here)
617
547
        self.assertIsInstance(t, LocalTransport)
618
548
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
619
549
 
620
550
    def test_get_transport_from_relpath(self):
621
 
        here = osutils.abspath('.')
 
551
        here = os.path.abspath('.')
622
552
        t = get_transport('.')
623
553
        self.assertIsInstance(t, LocalTransport)
624
554
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
625
555
 
626
556
    def test_get_transport_from_local_url(self):
627
 
        here = osutils.abspath('.')
 
557
        here = os.path.abspath('.')
628
558
        here_url = urlutils.local_path_to_url(here) + '/'
629
559
        t = get_transport(here_url)
630
560
        self.assertIsInstance(t, LocalTransport)
631
561
        self.assertEquals(t.base, here_url)
632
562
 
633
 
    def test_local_abspath(self):
634
 
        here = osutils.abspath('.')
635
 
        t = get_transport(here)
636
 
        self.assertEquals(t.local_abspath(''), here)
637
 
 
638
563
 
639
564
class TestWin32LocalTransport(TestCase):
640
565
 
649
574
        # make sure we reach the root
650
575
        t = t.clone('..')
651
576
        self.assertEquals(t.base, 'file://HOST/')
652
 
 
653
 
 
654
 
class TestConnectedTransport(TestCase):
655
 
    """Tests for connected to remote server transports"""
656
 
 
657
 
    def test_parse_url(self):
658
 
        t = ConnectedTransport('http://simple.example.com/home/source')
659
 
        self.assertEquals(t._host, 'simple.example.com')
660
 
        self.assertEquals(t._port, None)
661
 
        self.assertEquals(t._path, '/home/source/')
662
 
        self.failUnless(t._user is None)
663
 
        self.failUnless(t._password is None)
664
 
 
665
 
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
666
 
 
667
 
    def test_parse_url_with_at_in_user(self):
668
 
        # Bug 228058
669
 
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
670
 
        self.assertEquals(t._user, 'user@host.com')
671
 
 
672
 
    def test_parse_quoted_url(self):
673
 
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
674
 
        self.assertEquals(t._host, 'exAmple.com')
675
 
        self.assertEquals(t._port, 2222)
676
 
        self.assertEquals(t._user, 'robey')
677
 
        self.assertEquals(t._password, 'h@t')
678
 
        self.assertEquals(t._path, '/path/')
679
 
 
680
 
        # Base should not keep track of the password
681
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
682
 
 
683
 
    def test_parse_invalid_url(self):
684
 
        self.assertRaises(errors.InvalidURL,
685
 
                          ConnectedTransport,
686
 
                          'sftp://lily.org:~janneke/public/bzr/gub')
687
 
 
688
 
    def test_relpath(self):
689
 
        t = ConnectedTransport('sftp://user@host.com/abs/path')
690
 
 
691
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
692
 
        self.assertRaises(errors.PathNotChild, t.relpath,
693
 
                          'http://user@host.com/abs/path/sub')
694
 
        self.assertRaises(errors.PathNotChild, t.relpath,
695
 
                          'sftp://user2@host.com/abs/path/sub')
696
 
        self.assertRaises(errors.PathNotChild, t.relpath,
697
 
                          'sftp://user@otherhost.com/abs/path/sub')
698
 
        self.assertRaises(errors.PathNotChild, t.relpath,
699
 
                          'sftp://user@host.com:33/abs/path/sub')
700
 
        # Make sure it works when we don't supply a username
701
 
        t = ConnectedTransport('sftp://host.com/abs/path')
702
 
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
703
 
 
704
 
        # Make sure it works when parts of the path will be url encoded
705
 
        t = ConnectedTransport('sftp://host.com/dev/%path')
706
 
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
707
 
 
708
 
    def test_connection_sharing_propagate_credentials(self):
709
 
        t = ConnectedTransport('ftp://user@host.com/abs/path')
710
 
        self.assertEquals('user', t._user)
711
 
        self.assertEquals('host.com', t._host)
712
 
        self.assertIs(None, t._get_connection())
713
 
        self.assertIs(None, t._password)
714
 
        c = t.clone('subdir')
715
 
        self.assertIs(None, c._get_connection())
716
 
        self.assertIs(None, t._password)
717
 
 
718
 
        # Simulate the user entering a password
719
 
        password = 'secret'
720
 
        connection = object()
721
 
        t._set_connection(connection, password)
722
 
        self.assertIs(connection, t._get_connection())
723
 
        self.assertIs(password, t._get_credentials())
724
 
        self.assertIs(connection, c._get_connection())
725
 
        self.assertIs(password, c._get_credentials())
726
 
 
727
 
        # credentials can be updated
728
 
        new_password = 'even more secret'
729
 
        c._update_credentials(new_password)
730
 
        self.assertIs(connection, t._get_connection())
731
 
        self.assertIs(new_password, t._get_credentials())
732
 
        self.assertIs(connection, c._get_connection())
733
 
        self.assertIs(new_password, c._get_credentials())
734
 
 
735
 
 
736
 
class TestReusedTransports(TestCase):
737
 
    """Tests for transport reuse"""
738
 
 
739
 
    def test_reuse_same_transport(self):
740
 
        possible_transports = []
741
 
        t1 = get_transport('http://foo/',
742
 
                           possible_transports=possible_transports)
743
 
        self.assertEqual([t1], possible_transports)
744
 
        t2 = get_transport('http://foo/', possible_transports=[t1])
745
 
        self.assertIs(t1, t2)
746
 
 
747
 
        # Also check that final '/' are handled correctly
748
 
        t3 = get_transport('http://foo/path/')
749
 
        t4 = get_transport('http://foo/path', possible_transports=[t3])
750
 
        self.assertIs(t3, t4)
751
 
 
752
 
        t5 = get_transport('http://foo/path')
753
 
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
754
 
        self.assertIs(t5, t6)
755
 
 
756
 
    def test_don_t_reuse_different_transport(self):
757
 
        t1 = get_transport('http://foo/path')
758
 
        t2 = get_transport('http://bar/path', possible_transports=[t1])
759
 
        self.assertIsNot(t1, t2)
760
 
 
761
 
 
762
 
class TestTransportTrace(TestCase):
763
 
 
764
 
    def test_get(self):
765
 
        transport = get_transport('trace+memory://')
766
 
        self.assertIsInstance(
767
 
            transport, bzrlib.transport.trace.TransportTraceDecorator)
768
 
 
769
 
    def test_clone_preserves_activity(self):
770
 
        transport = get_transport('trace+memory://')
771
 
        transport2 = transport.clone('.')
772
 
        self.assertTrue(transport is not transport2)
773
 
        self.assertTrue(transport._activity is transport2._activity)
774
 
 
775
 
    # the following specific tests are for the operations that have made use of
776
 
    # logging in tests; we could test every single operation but doing that
777
 
    # still won't cause a test failure when the top level Transport API
778
 
    # changes; so there is little return doing that.
779
 
    def test_get(self):
780
 
        transport = get_transport('trace+memory:///')
781
 
        transport.put_bytes('foo', 'barish')
782
 
        transport.get('foo')
783
 
        expected_result = []
784
 
        # put_bytes records the bytes, not the content to avoid memory
785
 
        # pressure.
786
 
        expected_result.append(('put_bytes', 'foo', 6, None))
787
 
        # get records the file name only.
788
 
        expected_result.append(('get', 'foo'))
789
 
        self.assertEqual(expected_result, transport._activity)
790
 
 
791
 
    def test_readv(self):
792
 
        transport = get_transport('trace+memory:///')
793
 
        transport.put_bytes('foo', 'barish')
794
 
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
795
 
            upper_limit=6))
796
 
        expected_result = []
797
 
        # put_bytes records the bytes, not the content to avoid memory
798
 
        # pressure.
799
 
        expected_result.append(('put_bytes', 'foo', 6, None))
800
 
        # readv records the supplied offset request
801
 
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
802
 
        self.assertEqual(expected_result, transport._activity)