1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
from cStringIO import StringIO
26
from bzrlib.errors import (DependencyNotPresent,
34
from bzrlib.tests import TestCase, TestCaseInTempDir
35
from bzrlib.transport import (_clear_protocol_handlers,
38
_get_protocol_handlers,
39
_set_protocol_handlers,
40
_get_transport_modules,
43
register_lazy_transport,
44
register_transport_proto,
47
from bzrlib.transport.chroot import ChrootServer
48
from bzrlib.transport.memory import MemoryTransport
49
from bzrlib.transport.local import (LocalTransport,
50
EmulatedWin32LocalTransport)
53
# TODO: Should possibly split transport-specific tests into their own files.
56
class TestTransport(TestCase):
57
"""Test the non transport-concrete class functionality."""
59
def test__get_set_protocol_handlers(self):
60
handlers = _get_protocol_handlers()
61
self.assertNotEqual([], handlers.keys( ))
63
_clear_protocol_handlers()
64
self.assertEqual([], _get_protocol_handlers().keys())
66
_set_protocol_handlers(handlers)
68
def test_get_transport_modules(self):
69
handlers = _get_protocol_handlers()
70
# don't pollute the current handlers
71
_clear_protocol_handlers()
72
class SampleHandler(object):
73
"""I exist, isnt that enough?"""
75
_clear_protocol_handlers()
76
register_transport_proto('foo')
77
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
78
'TestTransport.SampleHandler')
79
register_transport_proto('bar')
80
register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
'TestTransport.SampleHandler')
82
self.assertEqual([SampleHandler.__module__,
83
'bzrlib.transport.chroot'],
84
_get_transport_modules())
86
_set_protocol_handlers(handlers)
88
def test_transport_dependency(self):
89
"""Transport with missing dependency causes no error"""
90
saved_handlers = _get_protocol_handlers()
91
# don't pollute the current handlers
92
_clear_protocol_handlers()
94
register_transport_proto('foo')
95
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
96
'BadTransportHandler')
98
get_transport('foo://fooserver/foo')
99
except UnsupportedProtocol, e:
101
self.assertEquals('Unsupported protocol'
102
' for url "foo://fooserver/foo":'
103
' Unable to import library "some_lib":'
104
' testing missing dependency', str(e))
106
self.fail('Did not raise UnsupportedProtocol')
108
# restore original values
109
_set_protocol_handlers(saved_handlers)
111
def test_transport_fallback(self):
112
"""Transport with missing dependency causes no error"""
113
saved_handlers = _get_protocol_handlers()
115
_clear_protocol_handlers()
116
register_transport_proto('foo')
117
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
118
'BackupTransportHandler')
119
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
120
'BadTransportHandler')
121
t = get_transport('foo://fooserver/foo')
122
self.assertTrue(isinstance(t, BackupTransportHandler))
124
_set_protocol_handlers(saved_handlers)
126
def test_LateReadError(self):
127
"""The LateReadError helper should raise on read()."""
128
a_file = LateReadError('a path')
131
except ReadError, error:
132
self.assertEqual('a path', error.path)
133
self.assertRaises(ReadError, a_file.read, 40)
136
def test__combine_paths(self):
138
self.assertEqual('/home/sarah/project/foo',
139
t._combine_paths('/home/sarah', 'project/foo'))
140
self.assertEqual('/etc',
141
t._combine_paths('/home/sarah', '../../etc'))
142
self.assertEqual('/etc',
143
t._combine_paths('/home/sarah', '../../../etc'))
144
self.assertEqual('/etc',
145
t._combine_paths('/home/sarah', '/etc'))
147
def test_local_abspath_non_local_transport(self):
148
# the base implementation should throw
149
t = MemoryTransport()
150
e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
151
self.assertEqual('memory:///t is not a local path.', str(e))
154
class TestCoalesceOffsets(TestCase):
156
def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
157
coalesce = Transport._coalesce_offsets
158
exp = [_CoalescedOffset(*x) for x in expected]
159
out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
161
self.assertEqual(exp, out)
163
def test_coalesce_empty(self):
166
def test_coalesce_simple(self):
167
self.check([(0, 10, [(0, 10)])], [(0, 10)])
169
def test_coalesce_unrelated(self):
170
self.check([(0, 10, [(0, 10)]),
172
], [(0, 10), (20, 10)])
174
def test_coalesce_unsorted(self):
175
self.check([(20, 10, [(0, 10)]),
177
], [(20, 10), (0, 10)])
179
def test_coalesce_nearby(self):
180
self.check([(0, 20, [(0, 10), (10, 10)])],
183
def test_coalesce_overlapped(self):
184
self.assertRaises(ValueError,
185
self.check, [(0, 15, [(0, 10), (5, 10)])],
188
def test_coalesce_limit(self):
189
self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
190
(30, 10), (40, 10)]),
191
(60, 50, [(0, 10), (10, 10), (20, 10),
192
(30, 10), (40, 10)]),
193
], [(10, 10), (20, 10), (30, 10), (40, 10),
194
(50, 10), (60, 10), (70, 10), (80, 10),
195
(90, 10), (100, 10)],
198
def test_coalesce_no_limit(self):
199
self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
200
(30, 10), (40, 10), (50, 10),
201
(60, 10), (70, 10), (80, 10),
203
], [(10, 10), (20, 10), (30, 10), (40, 10),
204
(50, 10), (60, 10), (70, 10), (80, 10),
205
(90, 10), (100, 10)])
207
def test_coalesce_fudge(self):
208
self.check([(10, 30, [(0, 10), (20, 10)]),
209
(100, 10, [(0, 10),]),
210
], [(10, 10), (30, 10), (100, 10)],
213
def test_coalesce_max_size(self):
214
self.check([(10, 20, [(0, 10), (10, 10)]),
216
# If one range is above max_size, it gets its own coalesced
218
(100, 80, [(0, 80),]),],
219
[(10, 10), (20, 10), (30, 50), (100, 80)],
223
def test_coalesce_no_max_size(self):
224
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
225
[(10, 10), (20, 10), (30, 50), (80, 100)],
228
def test_coalesce_default_limit(self):
229
# By default we use a 100MB max size.
230
ten_mb = 10*1024*1024
231
self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
232
(10*ten_mb, ten_mb, [(0, ten_mb)])],
233
[(i*ten_mb, ten_mb) for i in range(11)])
234
self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
235
[(i*ten_mb, ten_mb) for i in range(11)],
236
max_size=1*1024*1024*1024)
239
class TestMemoryTransport(TestCase):
241
def test_get_transport(self):
244
def test_clone(self):
245
transport = MemoryTransport()
246
self.assertTrue(isinstance(transport, MemoryTransport))
247
self.assertEqual("memory:///", transport.clone("/").base)
249
def test_abspath(self):
250
transport = MemoryTransport()
251
self.assertEqual("memory:///relpath", transport.abspath('relpath'))
253
def test_abspath_of_root(self):
254
transport = MemoryTransport()
255
self.assertEqual("memory:///", transport.base)
256
self.assertEqual("memory:///", transport.abspath('/'))
258
def test_abspath_of_relpath_starting_at_root(self):
259
transport = MemoryTransport()
260
self.assertEqual("memory:///foo", transport.abspath('/foo'))
262
def test_append_and_get(self):
263
transport = MemoryTransport()
264
transport.append_bytes('path', 'content')
265
self.assertEqual(transport.get('path').read(), 'content')
266
transport.append_file('path', StringIO('content'))
267
self.assertEqual(transport.get('path').read(), 'contentcontent')
269
def test_put_and_get(self):
270
transport = MemoryTransport()
271
transport.put_file('path', StringIO('content'))
272
self.assertEqual(transport.get('path').read(), 'content')
273
transport.put_bytes('path', 'content')
274
self.assertEqual(transport.get('path').read(), 'content')
276
def test_append_without_dir_fails(self):
277
transport = MemoryTransport()
278
self.assertRaises(NoSuchFile,
279
transport.append_bytes, 'dir/path', 'content')
281
def test_put_without_dir_fails(self):
282
transport = MemoryTransport()
283
self.assertRaises(NoSuchFile,
284
transport.put_file, 'dir/path', StringIO('content'))
286
def test_get_missing(self):
287
transport = MemoryTransport()
288
self.assertRaises(NoSuchFile, transport.get, 'foo')
290
def test_has_missing(self):
291
transport = MemoryTransport()
292
self.assertEquals(False, transport.has('foo'))
294
def test_has_present(self):
295
transport = MemoryTransport()
296
transport.append_bytes('foo', 'content')
297
self.assertEquals(True, transport.has('foo'))
299
def test_list_dir(self):
300
transport = MemoryTransport()
301
transport.put_bytes('foo', 'content')
302
transport.mkdir('dir')
303
transport.put_bytes('dir/subfoo', 'content')
304
transport.put_bytes('dirlike', 'content')
306
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
307
self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
309
def test_mkdir(self):
310
transport = MemoryTransport()
311
transport.mkdir('dir')
312
transport.append_bytes('dir/path', 'content')
313
self.assertEqual(transport.get('dir/path').read(), 'content')
315
def test_mkdir_missing_parent(self):
316
transport = MemoryTransport()
317
self.assertRaises(NoSuchFile,
318
transport.mkdir, 'dir/dir')
320
def test_mkdir_twice(self):
321
transport = MemoryTransport()
322
transport.mkdir('dir')
323
self.assertRaises(FileExists, transport.mkdir, 'dir')
325
def test_parameters(self):
326
transport = MemoryTransport()
327
self.assertEqual(True, transport.listable())
328
self.assertEqual(False, transport.is_readonly())
330
def test_iter_files_recursive(self):
331
transport = MemoryTransport()
332
transport.mkdir('dir')
333
transport.put_bytes('dir/foo', 'content')
334
transport.put_bytes('dir/bar', 'content')
335
transport.put_bytes('bar', 'content')
336
paths = set(transport.iter_files_recursive())
337
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
340
transport = MemoryTransport()
341
transport.put_bytes('foo', 'content')
342
transport.put_bytes('bar', 'phowar')
343
self.assertEqual(7, transport.stat('foo').st_size)
344
self.assertEqual(6, transport.stat('bar').st_size)
347
class ChrootDecoratorTransportTest(TestCase):
348
"""Chroot decoration specific tests."""
350
def test_abspath(self):
351
# The abspath is always relative to the chroot_url.
352
server = ChrootServer(get_transport('memory:///foo/bar/'))
354
transport = get_transport(server.get_url())
355
self.assertEqual(server.get_url(), transport.abspath('/'))
357
subdir_transport = transport.clone('subdir')
358
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
361
def test_clone(self):
362
server = ChrootServer(get_transport('memory:///foo/bar/'))
364
transport = get_transport(server.get_url())
365
# relpath from root and root path are the same
366
relpath_cloned = transport.clone('foo')
367
abspath_cloned = transport.clone('/foo')
368
self.assertEqual(server, relpath_cloned.server)
369
self.assertEqual(server, abspath_cloned.server)
372
def test_chroot_url_preserves_chroot(self):
373
"""Calling get_transport on a chroot transport's base should produce a
374
transport with exactly the same behaviour as the original chroot
377
This is so that it is not possible to escape a chroot by doing::
378
url = chroot_transport.base
379
parent_url = urlutils.join(url, '..')
380
new_transport = get_transport(parent_url)
382
server = ChrootServer(get_transport('memory:///path/subpath'))
384
transport = get_transport(server.get_url())
385
new_transport = get_transport(transport.base)
386
self.assertEqual(transport.server, new_transport.server)
387
self.assertEqual(transport.base, new_transport.base)
390
def test_urljoin_preserves_chroot(self):
391
"""Using urlutils.join(url, '..') on a chroot URL should not produce a
392
URL that escapes the intended chroot.
394
This is so that it is not possible to escape a chroot by doing::
395
url = chroot_transport.base
396
parent_url = urlutils.join(url, '..')
397
new_transport = get_transport(parent_url)
399
server = ChrootServer(get_transport('memory:///path/'))
401
transport = get_transport(server.get_url())
403
InvalidURLJoin, urlutils.join, transport.base, '..')
407
class ChrootServerTest(TestCase):
409
def test_construct(self):
410
backing_transport = MemoryTransport()
411
server = ChrootServer(backing_transport)
412
self.assertEqual(backing_transport, server.backing_transport)
414
def test_setUp(self):
415
backing_transport = MemoryTransport()
416
server = ChrootServer(backing_transport)
418
self.assertTrue(server.scheme in _get_protocol_handlers().keys())
420
def test_tearDown(self):
421
backing_transport = MemoryTransport()
422
server = ChrootServer(backing_transport)
425
self.assertFalse(server.scheme in _get_protocol_handlers().keys())
427
def test_get_url(self):
428
backing_transport = MemoryTransport()
429
server = ChrootServer(backing_transport)
431
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
435
class ReadonlyDecoratorTransportTest(TestCase):
436
"""Readonly decoration specific tests."""
438
def test_local_parameters(self):
439
import bzrlib.transport.readonly as readonly
440
# connect to . in readonly mode
441
transport = readonly.ReadonlyTransportDecorator('readonly+.')
442
self.assertEqual(True, transport.listable())
443
self.assertEqual(True, transport.is_readonly())
445
def test_http_parameters(self):
446
from bzrlib.tests.http_server import HttpServer
447
import bzrlib.transport.readonly as readonly
448
# connect to '.' via http which is not listable
449
server = HttpServer()
452
transport = get_transport('readonly+' + server.get_url())
453
self.failUnless(isinstance(transport,
454
readonly.ReadonlyTransportDecorator))
455
self.assertEqual(False, transport.listable())
456
self.assertEqual(True, transport.is_readonly())
461
class FakeNFSDecoratorTests(TestCaseInTempDir):
462
"""NFS decorator specific tests."""
464
def get_nfs_transport(self, url):
465
import bzrlib.transport.fakenfs as fakenfs
466
# connect to url with nfs decoration
467
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
469
def test_local_parameters(self):
470
# the listable and is_readonly parameters
471
# are not changed by the fakenfs decorator
472
transport = self.get_nfs_transport('.')
473
self.assertEqual(True, transport.listable())
474
self.assertEqual(False, transport.is_readonly())
476
def test_http_parameters(self):
477
# the listable and is_readonly parameters
478
# are not changed by the fakenfs decorator
479
from bzrlib.tests.http_server import HttpServer
480
# connect to '.' via http which is not listable
481
server = HttpServer()
484
transport = self.get_nfs_transport(server.get_url())
485
self.assertIsInstance(
486
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
487
self.assertEqual(False, transport.listable())
488
self.assertEqual(True, transport.is_readonly())
492
def test_fakenfs_server_default(self):
493
# a FakeNFSServer() should bring up a local relpath server for itself
494
import bzrlib.transport.fakenfs as fakenfs
495
server = fakenfs.FakeNFSServer()
498
# the url should be decorated appropriately
499
self.assertStartsWith(server.get_url(), 'fakenfs+')
500
# and we should be able to get a transport for it
501
transport = get_transport(server.get_url())
502
# which must be a FakeNFSTransportDecorator instance.
503
self.assertIsInstance(
504
transport, fakenfs.FakeNFSTransportDecorator)
508
def test_fakenfs_rename_semantics(self):
509
# a FakeNFS transport must mangle the way rename errors occur to
510
# look like NFS problems.
511
transport = self.get_nfs_transport('.')
512
self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
514
self.assertRaises(errors.ResourceBusy,
515
transport.rename, 'from', 'to')
518
class FakeVFATDecoratorTests(TestCaseInTempDir):
519
"""Tests for simulation of VFAT restrictions"""
521
def get_vfat_transport(self, url):
522
"""Return vfat-backed transport for test directory"""
523
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
524
return FakeVFATTransportDecorator('vfat+' + url)
526
def test_transport_creation(self):
527
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
528
transport = self.get_vfat_transport('.')
529
self.assertIsInstance(transport, FakeVFATTransportDecorator)
531
def test_transport_mkdir(self):
532
transport = self.get_vfat_transport('.')
533
transport.mkdir('HELLO')
534
self.assertTrue(transport.has('hello'))
535
self.assertTrue(transport.has('Hello'))
537
def test_forbidden_chars(self):
538
transport = self.get_vfat_transport('.')
539
self.assertRaises(ValueError, transport.has, "<NU>")
542
class BadTransportHandler(Transport):
543
def __init__(self, base_url):
544
raise DependencyNotPresent('some_lib', 'testing missing dependency')
547
class BackupTransportHandler(Transport):
548
"""Test transport that works as a backup for the BadTransportHandler"""
552
class TestTransportImplementation(TestCaseInTempDir):
553
"""Implementation verification for transports.
555
To verify a transport we need a server factory, which is a callable
556
that accepts no parameters and returns an implementation of
557
bzrlib.transport.Server.
559
That Server is then used to construct transport instances and test
560
the transport via loopback activity.
562
Currently this assumes that the Transport object is connected to the
563
current working directory. So that whatever is done
564
through the transport, should show up in the working
565
directory, and vice-versa. This is a bug, because its possible to have
566
URL schemes which provide access to something that may not be
567
result in storage on the local disk, i.e. due to file system limits, or
568
due to it being a database or some other non-filesystem tool.
570
This also tests to make sure that the functions work with both
571
generators and lists (assuming iter(list) is effectively a generator)
575
super(TestTransportImplementation, self).setUp()
576
self._server = self.transport_server()
578
self.addCleanup(self._server.tearDown)
580
def get_transport(self, relpath=None):
581
"""Return a connected transport to the local directory.
583
:param relpath: a path relative to the base url.
585
base_url = self._server.get_url()
586
url = self._adjust_url(base_url, relpath)
587
# try getting the transport via the regular interface:
588
t = get_transport(url)
589
# vila--20070607 if the following are commented out the test suite
590
# still pass. Is this really still needed or was it a forgotten
592
if not isinstance(t, self.transport_class):
593
# we did not get the correct transport class type. Override the
594
# regular connection behaviour by direct construction.
595
t = self.transport_class(url)
599
class TestLocalTransports(TestCase):
601
def test_get_transport_from_abspath(self):
602
here = osutils.abspath('.')
603
t = get_transport(here)
604
self.assertIsInstance(t, LocalTransport)
605
self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
607
def test_get_transport_from_relpath(self):
608
here = osutils.abspath('.')
609
t = get_transport('.')
610
self.assertIsInstance(t, LocalTransport)
611
self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
613
def test_get_transport_from_local_url(self):
614
here = osutils.abspath('.')
615
here_url = urlutils.local_path_to_url(here) + '/'
616
t = get_transport(here_url)
617
self.assertIsInstance(t, LocalTransport)
618
self.assertEquals(t.base, here_url)
620
def test_local_abspath(self):
621
here = osutils.abspath('.')
622
t = get_transport(here)
623
self.assertEquals(t.local_abspath(''), here)
626
class TestWin32LocalTransport(TestCase):
628
def test_unc_clone_to_root(self):
629
# Win32 UNC path like \\HOST\path
630
# clone to root should stop at least at \\HOST part
632
t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
635
self.assertEquals(t.base, 'file://HOST/')
636
# make sure we reach the root
638
self.assertEquals(t.base, 'file://HOST/')
641
class TestConnectedTransport(TestCase):
642
"""Tests for connected to remote server transports"""
644
def test_parse_url(self):
645
t = ConnectedTransport('http://simple.example.com/home/source')
646
self.assertEquals(t._host, 'simple.example.com')
647
self.assertEquals(t._port, None)
648
self.assertEquals(t._path, '/home/source/')
649
self.failUnless(t._user is None)
650
self.failUnless(t._password is None)
652
self.assertEquals(t.base, 'http://simple.example.com/home/source/')
654
def test_parse_url_with_at_in_user(self):
656
t = ConnectedTransport('ftp://user@host.com@www.host.com/')
657
self.assertEquals(t._user, 'user@host.com')
659
def test_parse_quoted_url(self):
660
t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
661
self.assertEquals(t._host, 'exAmple.com')
662
self.assertEquals(t._port, 2222)
663
self.assertEquals(t._user, 'robey')
664
self.assertEquals(t._password, 'h@t')
665
self.assertEquals(t._path, '/path/')
667
# Base should not keep track of the password
668
self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
670
def test_parse_invalid_url(self):
671
self.assertRaises(errors.InvalidURL,
673
'sftp://lily.org:~janneke/public/bzr/gub')
675
def test_relpath(self):
676
t = ConnectedTransport('sftp://user@host.com/abs/path')
678
self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
679
self.assertRaises(errors.PathNotChild, t.relpath,
680
'http://user@host.com/abs/path/sub')
681
self.assertRaises(errors.PathNotChild, t.relpath,
682
'sftp://user2@host.com/abs/path/sub')
683
self.assertRaises(errors.PathNotChild, t.relpath,
684
'sftp://user@otherhost.com/abs/path/sub')
685
self.assertRaises(errors.PathNotChild, t.relpath,
686
'sftp://user@host.com:33/abs/path/sub')
687
# Make sure it works when we don't supply a username
688
t = ConnectedTransport('sftp://host.com/abs/path')
689
self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
691
# Make sure it works when parts of the path will be url encoded
692
t = ConnectedTransport('sftp://host.com/dev/%path')
693
self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
695
def test_connection_sharing_propagate_credentials(self):
696
t = ConnectedTransport('ftp://user@host.com/abs/path')
697
self.assertEquals('user', t._user)
698
self.assertEquals('host.com', t._host)
699
self.assertIs(None, t._get_connection())
700
self.assertIs(None, t._password)
701
c = t.clone('subdir')
702
self.assertIs(None, c._get_connection())
703
self.assertIs(None, t._password)
705
# Simulate the user entering a password
707
connection = object()
708
t._set_connection(connection, password)
709
self.assertIs(connection, t._get_connection())
710
self.assertIs(password, t._get_credentials())
711
self.assertIs(connection, c._get_connection())
712
self.assertIs(password, c._get_credentials())
714
# credentials can be updated
715
new_password = 'even more secret'
716
c._update_credentials(new_password)
717
self.assertIs(connection, t._get_connection())
718
self.assertIs(new_password, t._get_credentials())
719
self.assertIs(connection, c._get_connection())
720
self.assertIs(new_password, c._get_credentials())
723
class TestReusedTransports(TestCase):
724
"""Tests for transport reuse"""
726
def test_reuse_same_transport(self):
727
possible_transports = []
728
t1 = get_transport('http://foo/',
729
possible_transports=possible_transports)
730
self.assertEqual([t1], possible_transports)
731
t2 = get_transport('http://foo/', possible_transports=[t1])
732
self.assertIs(t1, t2)
734
# Also check that final '/' are handled correctly
735
t3 = get_transport('http://foo/path/')
736
t4 = get_transport('http://foo/path', possible_transports=[t3])
737
self.assertIs(t3, t4)
739
t5 = get_transport('http://foo/path')
740
t6 = get_transport('http://foo/path/', possible_transports=[t5])
741
self.assertIs(t5, t6)
743
def test_don_t_reuse_different_transport(self):
744
t1 = get_transport('http://foo/path')
745
t2 = get_transport('http://bar/path', possible_transports=[t1])
746
self.assertIsNot(t1, t2)
749
class TestTransportTrace(TestCase):
752
transport = get_transport('trace+memory://')
753
self.assertIsInstance(
754
transport, bzrlib.transport.trace.TransportTraceDecorator)
756
def test_clone_preserves_activity(self):
757
transport = get_transport('trace+memory://')
758
transport2 = transport.clone('.')
759
self.assertTrue(transport is not transport2)
760
self.assertTrue(transport._activity is transport2._activity)
762
# the following specific tests are for the operations that have made use of
763
# logging in tests; we could test every single operation but doing that
764
# still won't cause a test failure when the top level Transport API
765
# changes; so there is little return doing that.
767
transport = get_transport('trace+memory:///')
768
transport.put_bytes('foo', 'barish')
771
# put_bytes records the bytes, not the content to avoid memory
773
expected_result.append(('put_bytes', 'foo', 6, None))
774
# get records the file name only.
775
expected_result.append(('get', 'foo'))
776
self.assertEqual(expected_result, transport._activity)
778
def test_readv(self):
779
transport = get_transport('trace+memory:///')
780
transport.put_bytes('foo', 'barish')
781
list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
784
# put_bytes records the bytes, not the content to avoid memory
786
expected_result.append(('put_bytes', 'foo', 6, None))
787
# readv records the supplied offset request
788
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
789
self.assertEqual(expected_result, transport._activity)