1
# Copyright (C) 2004, 2005 by Canonical Ltd
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21
18
from cStringIO import StringIO
23
from bzrlib.errors import (NoSuchFile, FileExists,
24
TransportNotPossible, ConnectionError)
25
from bzrlib.tests import TestCase
26
from bzrlib.transport import (_get_protocol_handlers,
26
from bzrlib.errors import (DependencyNotPresent,
34
from bzrlib.tests import TestCase, TestCaseInTempDir
35
from bzrlib.transport import (_clear_protocol_handlers,
38
_get_protocol_handlers,
39
_set_protocol_handlers,
27
40
_get_transport_modules,
29
43
register_lazy_transport,
30
_set_protocol_handlers,
44
register_transport_proto,
47
from bzrlib.transport.chroot import ChrootServer
48
from bzrlib.transport.memory import MemoryTransport
49
from bzrlib.transport.local import (LocalTransport,
50
EmulatedWin32LocalTransport)
53
# TODO: Should possibly split transport-specific tests into their own files.
35
56
class TestTransport(TestCase):
36
57
"""Test the non transport-concrete class functionality."""
38
def test_urlescape(self):
39
self.assertEqual('%25', urlescape('%'))
41
59
def test__get_set_protocol_handlers(self):
42
60
handlers = _get_protocol_handlers()
43
self.assertNotEqual({}, handlers)
61
self.assertNotEqual([], handlers.keys( ))
45
_set_protocol_handlers({})
46
self.assertEqual({}, _get_protocol_handlers())
63
_clear_protocol_handlers()
64
self.assertEqual([], _get_protocol_handlers().keys())
48
66
_set_protocol_handlers(handlers)
50
68
def test_get_transport_modules(self):
51
69
handlers = _get_protocol_handlers()
70
# don't pollute the current handlers
71
_clear_protocol_handlers()
52
72
class SampleHandler(object):
53
73
"""I exist, isnt that enough?"""
56
_set_protocol_handlers(my_handlers)
57
register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
58
register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
59
self.assertEqual([SampleHandler.__module__],
75
_clear_protocol_handlers()
76
register_transport_proto('foo')
77
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
78
'TestTransport.SampleHandler')
79
register_transport_proto('bar')
80
register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
'TestTransport.SampleHandler')
82
self.assertEqual([SampleHandler.__module__,
83
'bzrlib.transport.chroot'],
60
84
_get_transport_modules())
62
86
_set_protocol_handlers(handlers)
65
class MemoryTransportTest(TestCase):
66
"""Memory transport specific tests."""
88
def test_transport_dependency(self):
89
"""Transport with missing dependency causes no error"""
90
saved_handlers = _get_protocol_handlers()
91
# don't pollute the current handlers
92
_clear_protocol_handlers()
94
register_transport_proto('foo')
95
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
96
'BadTransportHandler')
98
get_transport('foo://fooserver/foo')
99
except UnsupportedProtocol, e:
101
self.assertEquals('Unsupported protocol'
102
' for url "foo://fooserver/foo":'
103
' Unable to import library "some_lib":'
104
' testing missing dependency', str(e))
106
self.fail('Did not raise UnsupportedProtocol')
108
# restore original values
109
_set_protocol_handlers(saved_handlers)
111
def test_transport_fallback(self):
112
"""Transport with missing dependency causes no error"""
113
saved_handlers = _get_protocol_handlers()
115
_clear_protocol_handlers()
116
register_transport_proto('foo')
117
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
118
'BackupTransportHandler')
119
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
120
'BadTransportHandler')
121
t = get_transport('foo://fooserver/foo')
122
self.assertTrue(isinstance(t, BackupTransportHandler))
124
_set_protocol_handlers(saved_handlers)
126
def test_ssh_hints(self):
127
"""Transport ssh:// should raise an error pointing out bzr+ssh://"""
129
get_transport('ssh://fooserver/foo')
130
except UnsupportedProtocol, e:
132
self.assertEquals('Unsupported protocol'
133
' for url "ssh://fooserver/foo":'
134
' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
137
self.fail('Did not raise UnsupportedProtocol')
139
def test_LateReadError(self):
140
"""The LateReadError helper should raise on read()."""
141
a_file = LateReadError('a path')
144
except ReadError, error:
145
self.assertEqual('a path', error.path)
146
self.assertRaises(ReadError, a_file.read, 40)
149
def test__combine_paths(self):
151
self.assertEqual('/home/sarah/project/foo',
152
t._combine_paths('/home/sarah', 'project/foo'))
153
self.assertEqual('/etc',
154
t._combine_paths('/home/sarah', '../../etc'))
155
self.assertEqual('/etc',
156
t._combine_paths('/home/sarah', '../../../etc'))
157
self.assertEqual('/etc',
158
t._combine_paths('/home/sarah', '/etc'))
160
def test_local_abspath_non_local_transport(self):
161
# the base implementation should throw
162
t = MemoryTransport()
163
e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
164
self.assertEqual('memory:///t is not a local path.', str(e))
167
class TestCoalesceOffsets(TestCase):
169
def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
170
coalesce = Transport._coalesce_offsets
171
exp = [_CoalescedOffset(*x) for x in expected]
172
out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
174
self.assertEqual(exp, out)
176
def test_coalesce_empty(self):
179
def test_coalesce_simple(self):
180
self.check([(0, 10, [(0, 10)])], [(0, 10)])
182
def test_coalesce_unrelated(self):
183
self.check([(0, 10, [(0, 10)]),
185
], [(0, 10), (20, 10)])
187
def test_coalesce_unsorted(self):
188
self.check([(20, 10, [(0, 10)]),
190
], [(20, 10), (0, 10)])
192
def test_coalesce_nearby(self):
193
self.check([(0, 20, [(0, 10), (10, 10)])],
196
def test_coalesce_overlapped(self):
197
self.assertRaises(ValueError,
198
self.check, [(0, 15, [(0, 10), (5, 10)])],
201
def test_coalesce_limit(self):
202
self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
203
(30, 10), (40, 10)]),
204
(60, 50, [(0, 10), (10, 10), (20, 10),
205
(30, 10), (40, 10)]),
206
], [(10, 10), (20, 10), (30, 10), (40, 10),
207
(50, 10), (60, 10), (70, 10), (80, 10),
208
(90, 10), (100, 10)],
211
def test_coalesce_no_limit(self):
212
self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
213
(30, 10), (40, 10), (50, 10),
214
(60, 10), (70, 10), (80, 10),
216
], [(10, 10), (20, 10), (30, 10), (40, 10),
217
(50, 10), (60, 10), (70, 10), (80, 10),
218
(90, 10), (100, 10)])
220
def test_coalesce_fudge(self):
221
self.check([(10, 30, [(0, 10), (20, 10)]),
222
(100, 10, [(0, 10),]),
223
], [(10, 10), (30, 10), (100, 10)],
226
def test_coalesce_max_size(self):
227
self.check([(10, 20, [(0, 10), (10, 10)]),
229
# If one range is above max_size, it gets its own coalesced
231
(100, 80, [(0, 80),]),],
232
[(10, 10), (20, 10), (30, 50), (100, 80)],
236
def test_coalesce_no_max_size(self):
237
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
238
[(10, 10), (20, 10), (30, 50), (80, 100)],
241
def test_coalesce_default_limit(self):
242
# By default we use a 100MB max size.
243
ten_mb = 10*1024*1024
244
self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
245
(10*ten_mb, ten_mb, [(0, ten_mb)])],
246
[(i*ten_mb, ten_mb) for i in range(11)])
247
self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
248
[(i*ten_mb, ten_mb) for i in range(11)],
249
max_size=1*1024*1024*1024)
252
class TestMemoryTransport(TestCase):
254
def test_get_transport(self):
257
def test_clone(self):
258
transport = MemoryTransport()
259
self.assertTrue(isinstance(transport, MemoryTransport))
260
self.assertEqual("memory:///", transport.clone("/").base)
262
def test_abspath(self):
263
transport = MemoryTransport()
264
self.assertEqual("memory:///relpath", transport.abspath('relpath'))
266
def test_abspath_of_root(self):
267
transport = MemoryTransport()
268
self.assertEqual("memory:///", transport.base)
269
self.assertEqual("memory:///", transport.abspath('/'))
271
def test_abspath_of_relpath_starting_at_root(self):
272
transport = MemoryTransport()
273
self.assertEqual("memory:///foo", transport.abspath('/foo'))
275
def test_append_and_get(self):
276
transport = MemoryTransport()
277
transport.append_bytes('path', 'content')
278
self.assertEqual(transport.get('path').read(), 'content')
279
transport.append_file('path', StringIO('content'))
280
self.assertEqual(transport.get('path').read(), 'contentcontent')
282
def test_put_and_get(self):
283
transport = MemoryTransport()
284
transport.put_file('path', StringIO('content'))
285
self.assertEqual(transport.get('path').read(), 'content')
286
transport.put_bytes('path', 'content')
287
self.assertEqual(transport.get('path').read(), 'content')
289
def test_append_without_dir_fails(self):
290
transport = MemoryTransport()
291
self.assertRaises(NoSuchFile,
292
transport.append_bytes, 'dir/path', 'content')
294
def test_put_without_dir_fails(self):
295
transport = MemoryTransport()
296
self.assertRaises(NoSuchFile,
297
transport.put_file, 'dir/path', StringIO('content'))
299
def test_get_missing(self):
300
transport = MemoryTransport()
301
self.assertRaises(NoSuchFile, transport.get, 'foo')
303
def test_has_missing(self):
304
transport = MemoryTransport()
305
self.assertEquals(False, transport.has('foo'))
307
def test_has_present(self):
308
transport = MemoryTransport()
309
transport.append_bytes('foo', 'content')
310
self.assertEquals(True, transport.has('foo'))
312
def test_list_dir(self):
313
transport = MemoryTransport()
314
transport.put_bytes('foo', 'content')
315
transport.mkdir('dir')
316
transport.put_bytes('dir/subfoo', 'content')
317
transport.put_bytes('dirlike', 'content')
319
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
320
self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
322
def test_mkdir(self):
323
transport = MemoryTransport()
324
transport.mkdir('dir')
325
transport.append_bytes('dir/path', 'content')
326
self.assertEqual(transport.get('dir/path').read(), 'content')
328
def test_mkdir_missing_parent(self):
329
transport = MemoryTransport()
330
self.assertRaises(NoSuchFile,
331
transport.mkdir, 'dir/dir')
333
def test_mkdir_twice(self):
334
transport = MemoryTransport()
335
transport.mkdir('dir')
336
self.assertRaises(FileExists, transport.mkdir, 'dir')
68
338
def test_parameters(self):
69
import bzrlib.transport.memory as memory
70
transport = memory.MemoryTransport()
339
transport = MemoryTransport()
71
340
self.assertEqual(True, transport.listable())
72
self.assertEqual(False, transport.should_cache())
73
341
self.assertEqual(False, transport.is_readonly())
343
def test_iter_files_recursive(self):
344
transport = MemoryTransport()
345
transport.mkdir('dir')
346
transport.put_bytes('dir/foo', 'content')
347
transport.put_bytes('dir/bar', 'content')
348
transport.put_bytes('bar', 'content')
349
paths = set(transport.iter_files_recursive())
350
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
353
transport = MemoryTransport()
354
transport.put_bytes('foo', 'content')
355
transport.put_bytes('bar', 'phowar')
356
self.assertEqual(7, transport.stat('foo').st_size)
357
self.assertEqual(6, transport.stat('bar').st_size)
360
class ChrootDecoratorTransportTest(TestCase):
361
"""Chroot decoration specific tests."""
363
def test_abspath(self):
364
# The abspath is always relative to the chroot_url.
365
server = ChrootServer(get_transport('memory:///foo/bar/'))
366
self.start_server(server)
367
transport = get_transport(server.get_url())
368
self.assertEqual(server.get_url(), transport.abspath('/'))
370
subdir_transport = transport.clone('subdir')
371
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
373
def test_clone(self):
374
server = ChrootServer(get_transport('memory:///foo/bar/'))
375
self.start_server(server)
376
transport = get_transport(server.get_url())
377
# relpath from root and root path are the same
378
relpath_cloned = transport.clone('foo')
379
abspath_cloned = transport.clone('/foo')
380
self.assertEqual(server, relpath_cloned.server)
381
self.assertEqual(server, abspath_cloned.server)
383
def test_chroot_url_preserves_chroot(self):
384
"""Calling get_transport on a chroot transport's base should produce a
385
transport with exactly the same behaviour as the original chroot
388
This is so that it is not possible to escape a chroot by doing::
389
url = chroot_transport.base
390
parent_url = urlutils.join(url, '..')
391
new_transport = get_transport(parent_url)
393
server = ChrootServer(get_transport('memory:///path/subpath'))
394
self.start_server(server)
395
transport = get_transport(server.get_url())
396
new_transport = get_transport(transport.base)
397
self.assertEqual(transport.server, new_transport.server)
398
self.assertEqual(transport.base, new_transport.base)
400
def test_urljoin_preserves_chroot(self):
401
"""Using urlutils.join(url, '..') on a chroot URL should not produce a
402
URL that escapes the intended chroot.
404
This is so that it is not possible to escape a chroot by doing::
405
url = chroot_transport.base
406
parent_url = urlutils.join(url, '..')
407
new_transport = get_transport(parent_url)
409
server = ChrootServer(get_transport('memory:///path/'))
410
self.start_server(server)
411
transport = get_transport(server.get_url())
413
InvalidURLJoin, urlutils.join, transport.base, '..')
416
class ChrootServerTest(TestCase):
418
def test_construct(self):
419
backing_transport = MemoryTransport()
420
server = ChrootServer(backing_transport)
421
self.assertEqual(backing_transport, server.backing_transport)
423
def test_setUp(self):
424
backing_transport = MemoryTransport()
425
server = ChrootServer(backing_transport)
428
self.assertTrue(server.scheme in _get_protocol_handlers().keys())
432
def test_tearDown(self):
433
backing_transport = MemoryTransport()
434
server = ChrootServer(backing_transport)
437
self.assertFalse(server.scheme in _get_protocol_handlers().keys())
439
def test_get_url(self):
440
backing_transport = MemoryTransport()
441
server = ChrootServer(backing_transport)
444
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
76
449
class ReadonlyDecoratorTransportTest(TestCase):
77
450
"""Readonly decoration specific tests."""
81
454
# connect to . in readonly mode
82
455
transport = readonly.ReadonlyTransportDecorator('readonly+.')
83
456
self.assertEqual(True, transport.listable())
84
self.assertEqual(False, transport.should_cache())
85
457
self.assertEqual(True, transport.is_readonly())
87
459
def test_http_parameters(self):
460
from bzrlib.tests.http_server import HttpServer
88
461
import bzrlib.transport.readonly as readonly
89
from bzrlib.transport.http import HttpServer
90
# connect to . via http which is not listable
94
transport = get_transport('readonly+' + server.get_url())
95
self.failUnless(isinstance(transport,
96
readonly.ReadonlyTransportDecorator))
97
self.assertEqual(False, transport.listable())
98
self.assertEqual(True, transport.should_cache())
99
self.assertEqual(True, transport.is_readonly())
462
# connect to '.' via http which is not listable
463
server = HttpServer()
464
self.start_server(server)
465
transport = get_transport('readonly+' + server.get_url())
466
self.failUnless(isinstance(transport,
467
readonly.ReadonlyTransportDecorator))
468
self.assertEqual(False, transport.listable())
469
self.assertEqual(True, transport.is_readonly())
472
class FakeNFSDecoratorTests(TestCaseInTempDir):
473
"""NFS decorator specific tests."""
475
def get_nfs_transport(self, url):
476
import bzrlib.transport.fakenfs as fakenfs
477
# connect to url with nfs decoration
478
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
480
def test_local_parameters(self):
481
# the listable and is_readonly parameters
482
# are not changed by the fakenfs decorator
483
transport = self.get_nfs_transport('.')
484
self.assertEqual(True, transport.listable())
485
self.assertEqual(False, transport.is_readonly())
487
def test_http_parameters(self):
488
# the listable and is_readonly parameters
489
# are not changed by the fakenfs decorator
490
from bzrlib.tests.http_server import HttpServer
491
# connect to '.' via http which is not listable
492
server = HttpServer()
493
self.start_server(server)
494
transport = self.get_nfs_transport(server.get_url())
495
self.assertIsInstance(
496
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
497
self.assertEqual(False, transport.listable())
498
self.assertEqual(True, transport.is_readonly())
500
def test_fakenfs_server_default(self):
501
# a FakeNFSServer() should bring up a local relpath server for itself
502
import bzrlib.transport.fakenfs as fakenfs
503
server = fakenfs.FakeNFSServer()
504
self.start_server(server)
505
# the url should be decorated appropriately
506
self.assertStartsWith(server.get_url(), 'fakenfs+')
507
# and we should be able to get a transport for it
508
transport = get_transport(server.get_url())
509
# which must be a FakeNFSTransportDecorator instance.
510
self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
512
def test_fakenfs_rename_semantics(self):
513
# a FakeNFS transport must mangle the way rename errors occur to
514
# look like NFS problems.
515
transport = self.get_nfs_transport('.')
516
self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
518
self.assertRaises(errors.ResourceBusy,
519
transport.rename, 'from', 'to')
522
class FakeVFATDecoratorTests(TestCaseInTempDir):
523
"""Tests for simulation of VFAT restrictions"""
525
def get_vfat_transport(self, url):
526
"""Return vfat-backed transport for test directory"""
527
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
528
return FakeVFATTransportDecorator('vfat+' + url)
530
def test_transport_creation(self):
531
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
532
transport = self.get_vfat_transport('.')
533
self.assertIsInstance(transport, FakeVFATTransportDecorator)
535
def test_transport_mkdir(self):
536
transport = self.get_vfat_transport('.')
537
transport.mkdir('HELLO')
538
self.assertTrue(transport.has('hello'))
539
self.assertTrue(transport.has('Hello'))
541
def test_forbidden_chars(self):
542
transport = self.get_vfat_transport('.')
543
self.assertRaises(ValueError, transport.has, "<NU>")
546
class BadTransportHandler(Transport):
547
def __init__(self, base_url):
548
raise DependencyNotPresent('some_lib', 'testing missing dependency')
551
class BackupTransportHandler(Transport):
552
"""Test transport that works as a backup for the BadTransportHandler"""
556
class TestTransportImplementation(TestCaseInTempDir):
557
"""Implementation verification for transports.
559
To verify a transport we need a server factory, which is a callable
560
that accepts no parameters and returns an implementation of
561
bzrlib.transport.Server.
563
That Server is then used to construct transport instances and test
564
the transport via loopback activity.
566
Currently this assumes that the Transport object is connected to the
567
current working directory. So that whatever is done
568
through the transport, should show up in the working
569
directory, and vice-versa. This is a bug, because its possible to have
570
URL schemes which provide access to something that may not be
571
result in storage on the local disk, i.e. due to file system limits, or
572
due to it being a database or some other non-filesystem tool.
574
This also tests to make sure that the functions work with both
575
generators and lists (assuming iter(list) is effectively a generator)
579
super(TestTransportImplementation, self).setUp()
580
self._server = self.transport_server()
581
self.start_server(self._server)
583
def get_transport(self, relpath=None):
584
"""Return a connected transport to the local directory.
586
:param relpath: a path relative to the base url.
588
base_url = self._server.get_url()
589
url = self._adjust_url(base_url, relpath)
590
# try getting the transport via the regular interface:
591
t = get_transport(url)
592
# vila--20070607 if the following are commented out the test suite
593
# still pass. Is this really still needed or was it a forgotten
595
if not isinstance(t, self.transport_class):
596
# we did not get the correct transport class type. Override the
597
# regular connection behaviour by direct construction.
598
t = self.transport_class(url)
602
class TestLocalTransports(TestCase):
604
def test_get_transport_from_abspath(self):
605
here = osutils.abspath('.')
606
t = get_transport(here)
607
self.assertIsInstance(t, LocalTransport)
608
self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
610
def test_get_transport_from_relpath(self):
611
here = osutils.abspath('.')
612
t = get_transport('.')
613
self.assertIsInstance(t, LocalTransport)
614
self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
616
def test_get_transport_from_local_url(self):
617
here = osutils.abspath('.')
618
here_url = urlutils.local_path_to_url(here) + '/'
619
t = get_transport(here_url)
620
self.assertIsInstance(t, LocalTransport)
621
self.assertEquals(t.base, here_url)
623
def test_local_abspath(self):
624
here = osutils.abspath('.')
625
t = get_transport(here)
626
self.assertEquals(t.local_abspath(''), here)
629
class TestWin32LocalTransport(TestCase):
631
def test_unc_clone_to_root(self):
632
# Win32 UNC path like \\HOST\path
633
# clone to root should stop at least at \\HOST part
635
t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
638
self.assertEquals(t.base, 'file://HOST/')
639
# make sure we reach the root
641
self.assertEquals(t.base, 'file://HOST/')
644
class TestConnectedTransport(TestCase):
645
"""Tests for connected to remote server transports"""
647
def test_parse_url(self):
648
t = ConnectedTransport('http://simple.example.com/home/source')
649
self.assertEquals(t._host, 'simple.example.com')
650
self.assertEquals(t._port, None)
651
self.assertEquals(t._path, '/home/source/')
652
self.failUnless(t._user is None)
653
self.failUnless(t._password is None)
655
self.assertEquals(t.base, 'http://simple.example.com/home/source/')
657
def test_parse_url_with_at_in_user(self):
659
t = ConnectedTransport('ftp://user@host.com@www.host.com/')
660
self.assertEquals(t._user, 'user@host.com')
662
def test_parse_quoted_url(self):
663
t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
664
self.assertEquals(t._host, 'exAmple.com')
665
self.assertEquals(t._port, 2222)
666
self.assertEquals(t._user, 'robey')
667
self.assertEquals(t._password, 'h@t')
668
self.assertEquals(t._path, '/path/')
670
# Base should not keep track of the password
671
self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
673
def test_parse_invalid_url(self):
674
self.assertRaises(errors.InvalidURL,
676
'sftp://lily.org:~janneke/public/bzr/gub')
678
def test_relpath(self):
679
t = ConnectedTransport('sftp://user@host.com/abs/path')
681
self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
682
self.assertRaises(errors.PathNotChild, t.relpath,
683
'http://user@host.com/abs/path/sub')
684
self.assertRaises(errors.PathNotChild, t.relpath,
685
'sftp://user2@host.com/abs/path/sub')
686
self.assertRaises(errors.PathNotChild, t.relpath,
687
'sftp://user@otherhost.com/abs/path/sub')
688
self.assertRaises(errors.PathNotChild, t.relpath,
689
'sftp://user@host.com:33/abs/path/sub')
690
# Make sure it works when we don't supply a username
691
t = ConnectedTransport('sftp://host.com/abs/path')
692
self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
694
# Make sure it works when parts of the path will be url encoded
695
t = ConnectedTransport('sftp://host.com/dev/%path')
696
self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
698
def test_connection_sharing_propagate_credentials(self):
699
t = ConnectedTransport('ftp://user@host.com/abs/path')
700
self.assertEquals('user', t._user)
701
self.assertEquals('host.com', t._host)
702
self.assertIs(None, t._get_connection())
703
self.assertIs(None, t._password)
704
c = t.clone('subdir')
705
self.assertIs(None, c._get_connection())
706
self.assertIs(None, t._password)
708
# Simulate the user entering a password
710
connection = object()
711
t._set_connection(connection, password)
712
self.assertIs(connection, t._get_connection())
713
self.assertIs(password, t._get_credentials())
714
self.assertIs(connection, c._get_connection())
715
self.assertIs(password, c._get_credentials())
717
# credentials can be updated
718
new_password = 'even more secret'
719
c._update_credentials(new_password)
720
self.assertIs(connection, t._get_connection())
721
self.assertIs(new_password, t._get_credentials())
722
self.assertIs(connection, c._get_connection())
723
self.assertIs(new_password, c._get_credentials())
726
class TestReusedTransports(TestCase):
727
"""Tests for transport reuse"""
729
def test_reuse_same_transport(self):
730
possible_transports = []
731
t1 = get_transport('http://foo/',
732
possible_transports=possible_transports)
733
self.assertEqual([t1], possible_transports)
734
t2 = get_transport('http://foo/', possible_transports=[t1])
735
self.assertIs(t1, t2)
737
# Also check that final '/' are handled correctly
738
t3 = get_transport('http://foo/path/')
739
t4 = get_transport('http://foo/path', possible_transports=[t3])
740
self.assertIs(t3, t4)
742
t5 = get_transport('http://foo/path')
743
t6 = get_transport('http://foo/path/', possible_transports=[t5])
744
self.assertIs(t5, t6)
746
def test_don_t_reuse_different_transport(self):
747
t1 = get_transport('http://foo/path')
748
t2 = get_transport('http://bar/path', possible_transports=[t1])
749
self.assertIsNot(t1, t2)
752
class TestTransportTrace(TestCase):
755
transport = get_transport('trace+memory://')
756
self.assertIsInstance(
757
transport, bzrlib.transport.trace.TransportTraceDecorator)
759
def test_clone_preserves_activity(self):
760
transport = get_transport('trace+memory://')
761
transport2 = transport.clone('.')
762
self.assertTrue(transport is not transport2)
763
self.assertTrue(transport._activity is transport2._activity)
765
# the following specific tests are for the operations that have made use of
766
# logging in tests; we could test every single operation but doing that
767
# still won't cause a test failure when the top level Transport API
768
# changes; so there is little return doing that.
770
transport = get_transport('trace+memory:///')
771
transport.put_bytes('foo', 'barish')
774
# put_bytes records the bytes, not the content to avoid memory
776
expected_result.append(('put_bytes', 'foo', 6, None))
777
# get records the file name only.
778
expected_result.append(('get', 'foo'))
779
self.assertEqual(expected_result, transport._activity)
781
def test_readv(self):
782
transport = get_transport('trace+memory:///')
783
transport.put_bytes('foo', 'barish')
784
list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
787
# put_bytes records the bytes, not the content to avoid memory
789
expected_result.append(('put_bytes', 'foo', 6, None))
790
# readv records the supplied offset request
791
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
792
self.assertEqual(expected_result, transport._activity)