101
124
_set_protocol_handlers(saved_handlers)
126
def test_ssh_hints(self):
127
"""Transport ssh:// should raise an error pointing out bzr+ssh://"""
129
get_transport('ssh://fooserver/foo')
130
except UnsupportedProtocol, e:
132
self.assertEquals('Unsupported protocol'
133
' for url "ssh://fooserver/foo":'
134
' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
137
self.fail('Did not raise UnsupportedProtocol')
139
def test_LateReadError(self):
140
"""The LateReadError helper should raise on read()."""
141
a_file = LateReadError('a path')
144
except ReadError, error:
145
self.assertEqual('a path', error.path)
146
self.assertRaises(ReadError, a_file.read, 40)
149
def test__combine_paths(self):
151
self.assertEqual('/home/sarah/project/foo',
152
t._combine_paths('/home/sarah', 'project/foo'))
153
self.assertEqual('/etc',
154
t._combine_paths('/home/sarah', '../../etc'))
155
self.assertEqual('/etc',
156
t._combine_paths('/home/sarah', '../../../etc'))
157
self.assertEqual('/etc',
158
t._combine_paths('/home/sarah', '/etc'))
160
def test_local_abspath_non_local_transport(self):
161
# the base implementation should throw
162
t = MemoryTransport()
163
e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
164
self.assertEqual('memory:///t is not a local path.', str(e))
104
167
class TestCoalesceOffsets(TestCase):
106
def check(self, expected, offsets, limit=0, fudge=0):
169
def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
107
170
coalesce = Transport._coalesce_offsets
108
171
exp = [_CoalescedOffset(*x) for x in expected]
109
out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
172
out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
110
174
self.assertEqual(exp, out)
112
176
def test_coalesce_empty(self):
158
223
], [(10, 10), (30, 10), (100, 10)],
226
def test_coalesce_max_size(self):
227
self.check([(10, 20, [(0, 10), (10, 10)]),
229
# If one range is above max_size, it gets its own coalesced
231
(100, 80, [(0, 80),]),],
232
[(10, 10), (20, 10), (30, 50), (100, 80)],
236
def test_coalesce_no_max_size(self):
237
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
238
[(10, 10), (20, 10), (30, 50), (80, 100)],
241
def test_coalesce_default_limit(self):
242
# By default we use a 100MB max size.
243
ten_mb = 10*1024*1024
244
self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
245
(10*ten_mb, ten_mb, [(0, ten_mb)])],
246
[(i*ten_mb, ten_mb) for i in range(11)])
247
self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
248
[(i*ten_mb, ten_mb) for i in range(11)],
249
max_size=1*1024*1024*1024)
163
252
class TestMemoryTransport(TestCase):
168
257
def test_clone(self):
169
258
transport = MemoryTransport()
170
259
self.assertTrue(isinstance(transport, MemoryTransport))
260
self.assertEqual("memory:///", transport.clone("/").base)
172
262
def test_abspath(self):
173
263
transport = MemoryTransport()
174
264
self.assertEqual("memory:///relpath", transport.abspath('relpath'))
176
def test_relpath(self):
177
transport = MemoryTransport()
266
def test_abspath_of_root(self):
267
transport = MemoryTransport()
268
self.assertEqual("memory:///", transport.base)
269
self.assertEqual("memory:///", transport.abspath('/'))
271
def test_abspath_of_relpath_starting_at_root(self):
272
transport = MemoryTransport()
273
self.assertEqual("memory:///foo", transport.abspath('/foo'))
179
275
def test_append_and_get(self):
180
276
transport = MemoryTransport()
181
transport.append('path', StringIO('content'))
277
transport.append_bytes('path', 'content')
182
278
self.assertEqual(transport.get('path').read(), 'content')
183
transport.append('path', StringIO('content'))
279
transport.append_file('path', StringIO('content'))
184
280
self.assertEqual(transport.get('path').read(), 'contentcontent')
186
282
def test_put_and_get(self):
187
283
transport = MemoryTransport()
188
transport.put('path', StringIO('content'))
284
transport.put_file('path', StringIO('content'))
189
285
self.assertEqual(transport.get('path').read(), 'content')
190
transport.put('path', StringIO('content'))
286
transport.put_bytes('path', 'content')
191
287
self.assertEqual(transport.get('path').read(), 'content')
193
289
def test_append_without_dir_fails(self):
194
290
transport = MemoryTransport()
195
291
self.assertRaises(NoSuchFile,
196
transport.append, 'dir/path', StringIO('content'))
292
transport.append_bytes, 'dir/path', 'content')
198
294
def test_put_without_dir_fails(self):
199
295
transport = MemoryTransport()
200
296
self.assertRaises(NoSuchFile,
201
transport.put, 'dir/path', StringIO('content'))
297
transport.put_file, 'dir/path', StringIO('content'))
203
299
def test_get_missing(self):
204
300
transport = MemoryTransport()
232
338
def test_parameters(self):
233
339
transport = MemoryTransport()
234
340
self.assertEqual(True, transport.listable())
235
self.assertEqual(False, transport.should_cache())
236
341
self.assertEqual(False, transport.is_readonly())
238
343
def test_iter_files_recursive(self):
239
344
transport = MemoryTransport()
240
345
transport.mkdir('dir')
241
transport.put('dir/foo', StringIO('content'))
242
transport.put('dir/bar', StringIO('content'))
243
transport.put('bar', StringIO('content'))
346
transport.put_bytes('dir/foo', 'content')
347
transport.put_bytes('dir/bar', 'content')
348
transport.put_bytes('bar', 'content')
244
349
paths = set(transport.iter_files_recursive())
245
350
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
247
352
def test_stat(self):
248
353
transport = MemoryTransport()
249
transport.put('foo', StringIO('content'))
250
transport.put('bar', StringIO('phowar'))
354
transport.put_bytes('foo', 'content')
355
transport.put_bytes('bar', 'phowar')
251
356
self.assertEqual(7, transport.stat('foo').st_size)
252
357
self.assertEqual(6, transport.stat('bar').st_size)
360
class ChrootDecoratorTransportTest(TestCase):
361
"""Chroot decoration specific tests."""
363
def test_abspath(self):
364
# The abspath is always relative to the chroot_url.
365
server = ChrootServer(get_transport('memory:///foo/bar/'))
366
self.start_server(server)
367
transport = get_transport(server.get_url())
368
self.assertEqual(server.get_url(), transport.abspath('/'))
370
subdir_transport = transport.clone('subdir')
371
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
373
def test_clone(self):
374
server = ChrootServer(get_transport('memory:///foo/bar/'))
375
self.start_server(server)
376
transport = get_transport(server.get_url())
377
# relpath from root and root path are the same
378
relpath_cloned = transport.clone('foo')
379
abspath_cloned = transport.clone('/foo')
380
self.assertEqual(server, relpath_cloned.server)
381
self.assertEqual(server, abspath_cloned.server)
383
def test_chroot_url_preserves_chroot(self):
384
"""Calling get_transport on a chroot transport's base should produce a
385
transport with exactly the same behaviour as the original chroot
388
This is so that it is not possible to escape a chroot by doing::
389
url = chroot_transport.base
390
parent_url = urlutils.join(url, '..')
391
new_transport = get_transport(parent_url)
393
server = ChrootServer(get_transport('memory:///path/subpath'))
394
self.start_server(server)
395
transport = get_transport(server.get_url())
396
new_transport = get_transport(transport.base)
397
self.assertEqual(transport.server, new_transport.server)
398
self.assertEqual(transport.base, new_transport.base)
400
def test_urljoin_preserves_chroot(self):
401
"""Using urlutils.join(url, '..') on a chroot URL should not produce a
402
URL that escapes the intended chroot.
404
This is so that it is not possible to escape a chroot by doing::
405
url = chroot_transport.base
406
parent_url = urlutils.join(url, '..')
407
new_transport = get_transport(parent_url)
409
server = ChrootServer(get_transport('memory:///path/'))
410
self.start_server(server)
411
transport = get_transport(server.get_url())
413
InvalidURLJoin, urlutils.join, transport.base, '..')
416
class ChrootServerTest(TestCase):
418
def test_construct(self):
419
backing_transport = MemoryTransport()
420
server = ChrootServer(backing_transport)
421
self.assertEqual(backing_transport, server.backing_transport)
423
def test_setUp(self):
424
backing_transport = MemoryTransport()
425
server = ChrootServer(backing_transport)
428
self.assertTrue(server.scheme in _get_protocol_handlers().keys())
432
def test_tearDown(self):
433
backing_transport = MemoryTransport()
434
server = ChrootServer(backing_transport)
437
self.assertFalse(server.scheme in _get_protocol_handlers().keys())
439
def test_get_url(self):
440
backing_transport = MemoryTransport()
441
server = ChrootServer(backing_transport)
444
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
255
449
class ReadonlyDecoratorTransportTest(TestCase):
256
450
"""Readonly decoration specific tests."""
289
478
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
291
480
def test_local_parameters(self):
292
# the listable, should_cache and is_readonly parameters
481
# the listable and is_readonly parameters
293
482
# are not changed by the fakenfs decorator
294
483
transport = self.get_nfs_transport('.')
295
484
self.assertEqual(True, transport.listable())
296
self.assertEqual(False, transport.should_cache())
297
485
self.assertEqual(False, transport.is_readonly())
299
487
def test_http_parameters(self):
300
# the listable, should_cache and is_readonly parameters
488
# the listable and is_readonly parameters
301
489
# are not changed by the fakenfs decorator
302
from bzrlib.transport.http import HttpServer
303
# connect to . via http which is not listable
490
from bzrlib.tests.http_server import HttpServer
491
# connect to '.' via http which is not listable
304
492
server = HttpServer()
307
transport = self.get_nfs_transport(server.get_url())
308
self.assertIsInstance(
309
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
310
self.assertEqual(False, transport.listable())
311
self.assertEqual(True, transport.should_cache())
312
self.assertEqual(True, transport.is_readonly())
493
self.start_server(server)
494
transport = self.get_nfs_transport(server.get_url())
495
self.assertIsInstance(
496
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
497
self.assertEqual(False, transport.listable())
498
self.assertEqual(True, transport.is_readonly())
316
500
def test_fakenfs_server_default(self):
317
501
# a FakeNFSServer() should bring up a local relpath server for itself
318
502
import bzrlib.transport.fakenfs as fakenfs
319
503
server = fakenfs.FakeNFSServer()
322
# the server should be a relpath localhost server
323
self.assertEqual(server.get_url(), 'fakenfs+.')
324
# and we should be able to get a transport for it
325
transport = get_transport(server.get_url())
326
# which must be a FakeNFSTransportDecorator instance.
327
self.assertIsInstance(
328
transport, fakenfs.FakeNFSTransportDecorator)
504
self.start_server(server)
505
# the url should be decorated appropriately
506
self.assertStartsWith(server.get_url(), 'fakenfs+')
507
# and we should be able to get a transport for it
508
transport = get_transport(server.get_url())
509
# which must be a FakeNFSTransportDecorator instance.
510
self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
332
512
def test_fakenfs_rename_semantics(self):
333
513
# a FakeNFS transport must mangle the way rename errors occur to
376
556
class TestTransportImplementation(TestCaseInTempDir):
377
557
"""Implementation verification for transports.
379
559
To verify a transport we need a server factory, which is a callable
380
560
that accepts no parameters and returns an implementation of
381
561
bzrlib.transport.Server.
383
563
That Server is then used to construct transport instances and test
384
564
the transport via loopback activity.
386
Currently this assumes that the Transport object is connected to the
387
current working directory. So that whatever is done
388
through the transport, should show up in the working
566
Currently this assumes that the Transport object is connected to the
567
current working directory. So that whatever is done
568
through the transport, should show up in the working
389
569
directory, and vice-versa. This is a bug, because its possible to have
390
URL schemes which provide access to something that may not be
391
result in storage on the local disk, i.e. due to file system limits, or
570
URL schemes which provide access to something that may not be
571
result in storage on the local disk, i.e. due to file system limits, or
392
572
due to it being a database or some other non-filesystem tool.
394
574
This also tests to make sure that the functions work with both
395
575
generators and lists (assuming iter(list) is effectively a generator)
399
579
super(TestTransportImplementation, self).setUp()
400
580
self._server = self.transport_server()
404
super(TestTransportImplementation, self).tearDown()
405
self._server.tearDown()
407
def get_transport(self):
408
"""Return a connected transport to the local directory."""
581
self.start_server(self._server)
583
def get_transport(self, relpath=None):
584
"""Return a connected transport to the local directory.
586
:param relpath: a path relative to the base url.
409
588
base_url = self._server.get_url()
589
url = self._adjust_url(base_url, relpath)
410
590
# try getting the transport via the regular interface:
411
t = get_transport(base_url)
412
if not isinstance(t, self.transport_class):
591
t = get_transport(url)
592
# vila--20070607 if the following are commented out the test suite
593
# still pass. Is this really still needed or was it a forgotten
595
if not isinstance(t, self.transport_class):
413
596
# we did not get the correct transport class type. Override the
414
597
# regular connection behaviour by direct construction.
415
t = self.transport_class(base_url)
598
t = self.transport_class(url)
602
class TestLocalTransports(TestCase):
604
def test_get_transport_from_abspath(self):
605
here = osutils.abspath('.')
606
t = get_transport(here)
607
self.assertIsInstance(t, LocalTransport)
608
self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
610
def test_get_transport_from_relpath(self):
611
here = osutils.abspath('.')
612
t = get_transport('.')
613
self.assertIsInstance(t, LocalTransport)
614
self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
616
def test_get_transport_from_local_url(self):
617
here = osutils.abspath('.')
618
here_url = urlutils.local_path_to_url(here) + '/'
619
t = get_transport(here_url)
620
self.assertIsInstance(t, LocalTransport)
621
self.assertEquals(t.base, here_url)
623
def test_local_abspath(self):
624
here = osutils.abspath('.')
625
t = get_transport(here)
626
self.assertEquals(t.local_abspath(''), here)
629
class TestWin32LocalTransport(TestCase):
631
def test_unc_clone_to_root(self):
632
# Win32 UNC path like \\HOST\path
633
# clone to root should stop at least at \\HOST part
635
t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
638
self.assertEquals(t.base, 'file://HOST/')
639
# make sure we reach the root
641
self.assertEquals(t.base, 'file://HOST/')
644
class TestConnectedTransport(TestCase):
645
"""Tests for connected to remote server transports"""
647
def test_parse_url(self):
648
t = ConnectedTransport('http://simple.example.com/home/source')
649
self.assertEquals(t._host, 'simple.example.com')
650
self.assertEquals(t._port, None)
651
self.assertEquals(t._path, '/home/source/')
652
self.failUnless(t._user is None)
653
self.failUnless(t._password is None)
655
self.assertEquals(t.base, 'http://simple.example.com/home/source/')
657
def test_parse_url_with_at_in_user(self):
659
t = ConnectedTransport('ftp://user@host.com@www.host.com/')
660
self.assertEquals(t._user, 'user@host.com')
662
def test_parse_quoted_url(self):
663
t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
664
self.assertEquals(t._host, 'exAmple.com')
665
self.assertEquals(t._port, 2222)
666
self.assertEquals(t._user, 'robey')
667
self.assertEquals(t._password, 'h@t')
668
self.assertEquals(t._path, '/path/')
670
# Base should not keep track of the password
671
self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
673
def test_parse_invalid_url(self):
674
self.assertRaises(errors.InvalidURL,
676
'sftp://lily.org:~janneke/public/bzr/gub')
678
def test_relpath(self):
679
t = ConnectedTransport('sftp://user@host.com/abs/path')
681
self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
682
self.assertRaises(errors.PathNotChild, t.relpath,
683
'http://user@host.com/abs/path/sub')
684
self.assertRaises(errors.PathNotChild, t.relpath,
685
'sftp://user2@host.com/abs/path/sub')
686
self.assertRaises(errors.PathNotChild, t.relpath,
687
'sftp://user@otherhost.com/abs/path/sub')
688
self.assertRaises(errors.PathNotChild, t.relpath,
689
'sftp://user@host.com:33/abs/path/sub')
690
# Make sure it works when we don't supply a username
691
t = ConnectedTransport('sftp://host.com/abs/path')
692
self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
694
# Make sure it works when parts of the path will be url encoded
695
t = ConnectedTransport('sftp://host.com/dev/%path')
696
self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
698
def test_connection_sharing_propagate_credentials(self):
699
t = ConnectedTransport('ftp://user@host.com/abs/path')
700
self.assertEquals('user', t._user)
701
self.assertEquals('host.com', t._host)
702
self.assertIs(None, t._get_connection())
703
self.assertIs(None, t._password)
704
c = t.clone('subdir')
705
self.assertIs(None, c._get_connection())
706
self.assertIs(None, t._password)
708
# Simulate the user entering a password
710
connection = object()
711
t._set_connection(connection, password)
712
self.assertIs(connection, t._get_connection())
713
self.assertIs(password, t._get_credentials())
714
self.assertIs(connection, c._get_connection())
715
self.assertIs(password, c._get_credentials())
717
# credentials can be updated
718
new_password = 'even more secret'
719
c._update_credentials(new_password)
720
self.assertIs(connection, t._get_connection())
721
self.assertIs(new_password, t._get_credentials())
722
self.assertIs(connection, c._get_connection())
723
self.assertIs(new_password, c._get_credentials())
726
class TestReusedTransports(TestCase):
727
"""Tests for transport reuse"""
729
def test_reuse_same_transport(self):
730
possible_transports = []
731
t1 = get_transport('http://foo/',
732
possible_transports=possible_transports)
733
self.assertEqual([t1], possible_transports)
734
t2 = get_transport('http://foo/', possible_transports=[t1])
735
self.assertIs(t1, t2)
737
# Also check that final '/' are handled correctly
738
t3 = get_transport('http://foo/path/')
739
t4 = get_transport('http://foo/path', possible_transports=[t3])
740
self.assertIs(t3, t4)
742
t5 = get_transport('http://foo/path')
743
t6 = get_transport('http://foo/path/', possible_transports=[t5])
744
self.assertIs(t5, t6)
746
def test_don_t_reuse_different_transport(self):
747
t1 = get_transport('http://foo/path')
748
t2 = get_transport('http://bar/path', possible_transports=[t1])
749
self.assertIsNot(t1, t2)
752
class TestTransportTrace(TestCase):
755
transport = get_transport('trace+memory://')
756
self.assertIsInstance(
757
transport, bzrlib.transport.trace.TransportTraceDecorator)
759
def test_clone_preserves_activity(self):
760
transport = get_transport('trace+memory://')
761
transport2 = transport.clone('.')
762
self.assertTrue(transport is not transport2)
763
self.assertTrue(transport._activity is transport2._activity)
765
# the following specific tests are for the operations that have made use of
766
# logging in tests; we could test every single operation but doing that
767
# still won't cause a test failure when the top level Transport API
768
# changes; so there is little return doing that.
770
transport = get_transport('trace+memory:///')
771
transport.put_bytes('foo', 'barish')
774
# put_bytes records the bytes, not the content to avoid memory
776
expected_result.append(('put_bytes', 'foo', 6, None))
777
# get records the file name only.
778
expected_result.append(('get', 'foo'))
779
self.assertEqual(expected_result, transport._activity)
781
def test_readv(self):
782
transport = get_transport('trace+memory:///')
783
transport.put_bytes('foo', 'barish')
784
list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
787
# put_bytes records the bytes, not the content to avoid memory
789
expected_result.append(('put_bytes', 'foo', 6, None))
790
# readv records the supplied offset request
791
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
792
self.assertEqual(expected_result, transport._activity)