59
52
def test__get_set_protocol_handlers(self):
60
53
handlers = _get_protocol_handlers()
61
self.assertNotEqual([], handlers.keys( ))
54
self.assertNotEqual({}, handlers)
63
_clear_protocol_handlers()
64
self.assertEqual([], _get_protocol_handlers().keys())
56
_set_protocol_handlers({})
57
self.assertEqual({}, _get_protocol_handlers())
66
59
_set_protocol_handlers(handlers)
68
61
def test_get_transport_modules(self):
69
62
handlers = _get_protocol_handlers()
70
# don't pollute the current handlers
71
_clear_protocol_handlers()
72
63
class SampleHandler(object):
73
64
"""I exist, isnt that enough?"""
75
_clear_protocol_handlers()
76
register_transport_proto('foo')
77
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
78
'TestTransport.SampleHandler')
79
register_transport_proto('bar')
80
register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
'TestTransport.SampleHandler')
82
self.assertEqual([SampleHandler.__module__,
83
'bzrlib.transport.chroot'],
67
_set_protocol_handlers(my_handlers)
68
register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
69
register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
70
self.assertEqual([SampleHandler.__module__],
84
71
_get_transport_modules())
86
73
_set_protocol_handlers(handlers)
210
175
], [(10, 10), (30, 10), (100, 10)],
213
def test_coalesce_max_size(self):
214
self.check([(10, 20, [(0, 10), (10, 10)]),
216
# If one range is above max_size, it gets its own coalesced
218
(100, 80, [(0, 80),]),],
219
[(10, 10), (20, 10), (30, 50), (100, 80)],
223
def test_coalesce_no_max_size(self):
224
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
225
[(10, 10), (20, 10), (30, 50), (80, 100)],
229
180
class TestMemoryTransport(TestCase):
337
289
class ChrootDecoratorTransportTest(TestCase):
338
290
"""Chroot decoration specific tests."""
340
def test_abspath(self):
341
# The abspath is always relative to the chroot_url.
342
server = ChrootServer(get_transport('memory:///foo/bar/'))
344
transport = get_transport(server.get_url())
345
self.assertEqual(server.get_url(), transport.abspath('/'))
347
subdir_transport = transport.clone('subdir')
348
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
351
def test_clone(self):
352
server = ChrootServer(get_transport('memory:///foo/bar/'))
354
transport = get_transport(server.get_url())
355
# relpath from root and root path are the same
356
relpath_cloned = transport.clone('foo')
357
abspath_cloned = transport.clone('/foo')
358
self.assertEqual(server, relpath_cloned.server)
359
self.assertEqual(server, abspath_cloned.server)
362
def test_chroot_url_preserves_chroot(self):
363
"""Calling get_transport on a chroot transport's base should produce a
364
transport with exactly the same behaviour as the original chroot
367
This is so that it is not possible to escape a chroot by doing::
368
url = chroot_transport.base
369
parent_url = urlutils.join(url, '..')
370
new_transport = get_transport(parent_url)
372
server = ChrootServer(get_transport('memory:///path/subpath'))
374
transport = get_transport(server.get_url())
375
new_transport = get_transport(transport.base)
376
self.assertEqual(transport.server, new_transport.server)
377
self.assertEqual(transport.base, new_transport.base)
380
def test_urljoin_preserves_chroot(self):
381
"""Using urlutils.join(url, '..') on a chroot URL should not produce a
382
URL that escapes the intended chroot.
384
This is so that it is not possible to escape a chroot by doing::
385
url = chroot_transport.base
386
parent_url = urlutils.join(url, '..')
387
new_transport = get_transport(parent_url)
389
server = ChrootServer(get_transport('memory:///path/'))
391
transport = get_transport(server.get_url())
393
InvalidURLJoin, urlutils.join, transport.base, '..')
397
class ChrootServerTest(TestCase):
399
292
def test_construct(self):
400
backing_transport = MemoryTransport()
401
server = ChrootServer(backing_transport)
402
self.assertEqual(backing_transport, server.backing_transport)
404
def test_setUp(self):
405
backing_transport = MemoryTransport()
406
server = ChrootServer(backing_transport)
408
self.assertTrue(server.scheme in _get_protocol_handlers().keys())
410
def test_tearDown(self):
411
backing_transport = MemoryTransport()
412
server = ChrootServer(backing_transport)
415
self.assertFalse(server.scheme in _get_protocol_handlers().keys())
417
def test_get_url(self):
418
backing_transport = MemoryTransport()
419
server = ChrootServer(backing_transport)
421
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
293
from bzrlib.transport import chroot
294
transport = chroot.ChrootTransportDecorator('chroot+memory:///pathA/')
295
self.assertEqual('memory:///pathA/', transport.chroot_url)
297
transport = chroot.ChrootTransportDecorator(
298
'chroot+memory:///path/B', chroot='memory:///path/')
299
self.assertEqual('memory:///path/', transport.chroot_url)
301
def test_append_file(self):
302
transport = get_transport('chroot+memory:///foo/bar')
303
self.assertRaises(PathNotChild, transport.append_file, '/foo', None)
305
def test_append_bytes(self):
306
transport = get_transport('chroot+memory:///foo/bar')
307
self.assertRaises(PathNotChild, transport.append_bytes, '/foo', 'bytes')
309
def test_clone(self):
310
transport = get_transport('chroot+memory:///foo/bar')
311
self.assertRaises(PathNotChild, transport.clone, '/foo')
313
def test_delete(self):
314
transport = get_transport('chroot+memory:///foo/bar')
315
self.assertRaises(PathNotChild, transport.delete, '/foo')
317
def test_delete_tree(self):
318
transport = get_transport('chroot+memory:///foo/bar')
319
self.assertRaises(PathNotChild, transport.delete_tree, '/foo')
322
transport = get_transport('chroot+memory:///foo/bar')
323
self.assertRaises(PathNotChild, transport.get, '/foo')
325
def test_get_bytes(self):
326
transport = get_transport('chroot+memory:///foo/bar')
327
self.assertRaises(PathNotChild, transport.get_bytes, '/foo')
330
transport = get_transport('chroot+memory:///foo/bar')
331
self.assertRaises(PathNotChild, transport.has, '/foo')
333
def test_list_dir(self):
334
transport = get_transport('chroot+memory:///foo/bar')
335
self.assertRaises(PathNotChild, transport.list_dir, '/foo')
337
def test_lock_read(self):
338
transport = get_transport('chroot+memory:///foo/bar')
339
self.assertRaises(PathNotChild, transport.lock_read, '/foo')
341
def test_lock_write(self):
342
transport = get_transport('chroot+memory:///foo/bar')
343
self.assertRaises(PathNotChild, transport.lock_write, '/foo')
345
def test_mkdir(self):
346
transport = get_transport('chroot+memory:///foo/bar')
347
self.assertRaises(PathNotChild, transport.mkdir, '/foo')
349
def test_put_bytes(self):
350
transport = get_transport('chroot+memory:///foo/bar')
351
self.assertRaises(PathNotChild, transport.put_bytes, '/foo', 'bytes')
353
def test_put_file(self):
354
transport = get_transport('chroot+memory:///foo/bar')
355
self.assertRaises(PathNotChild, transport.put_file, '/foo', None)
357
def test_rename(self):
358
transport = get_transport('chroot+memory:///foo/bar')
359
self.assertRaises(PathNotChild, transport.rename, '/aaa', 'bbb')
360
self.assertRaises(PathNotChild, transport.rename, 'ccc', '/d')
362
def test_rmdir(self):
363
transport = get_transport('chroot+memory:///foo/bar')
364
self.assertRaises(PathNotChild, transport.rmdir, '/foo')
367
transport = get_transport('chroot+memory:///foo/bar')
368
self.assertRaises(PathNotChild, transport.stat, '/foo')
425
371
class ReadonlyDecoratorTransportTest(TestCase):
565
515
super(TestTransportImplementation, self).setUp()
566
516
self._server = self.transport_server()
567
517
self._server.setUp()
568
self.addCleanup(self._server.tearDown)
570
def get_transport(self, relpath=None):
571
"""Return a connected transport to the local directory.
573
:param relpath: a path relative to the base url.
520
super(TestTransportImplementation, self).tearDown()
521
self._server.tearDown()
523
def get_transport(self):
524
"""Return a connected transport to the local directory."""
575
525
base_url = self._server.get_url()
576
url = self._adjust_url(base_url, relpath)
577
526
# try getting the transport via the regular interface:
578
t = get_transport(url)
579
# vila--20070607 if the following are commented out the test suite
580
# still pass. Is this really still needed or was it a forgotten
527
t = get_transport(base_url)
582
528
if not isinstance(t, self.transport_class):
583
529
# we did not get the correct transport class type. Override the
584
530
# regular connection behaviour by direct construction.
585
t = self.transport_class(url)
531
t = self.transport_class(base_url)
589
535
class TestLocalTransports(TestCase):
591
537
def test_get_transport_from_abspath(self):
592
here = osutils.abspath('.')
538
here = os.path.abspath('.')
593
539
t = get_transport(here)
594
540
self.assertIsInstance(t, LocalTransport)
595
541
self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
597
543
def test_get_transport_from_relpath(self):
598
here = osutils.abspath('.')
544
here = os.path.abspath('.')
599
545
t = get_transport('.')
600
546
self.assertIsInstance(t, LocalTransport)
601
547
self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
603
549
def test_get_transport_from_local_url(self):
604
here = osutils.abspath('.')
550
here = os.path.abspath('.')
605
551
here_url = urlutils.local_path_to_url(here) + '/'
606
552
t = get_transport(here_url)
607
553
self.assertIsInstance(t, LocalTransport)
608
554
self.assertEquals(t.base, here_url)
610
def test_local_abspath(self):
611
here = osutils.abspath('.')
612
t = get_transport(here)
613
self.assertEquals(t.local_abspath(''), here)
616
557
class TestWin32LocalTransport(TestCase):
626
567
# make sure we reach the root
627
568
t = t.clone('..')
628
569
self.assertEquals(t.base, 'file://HOST/')
631
class TestConnectedTransport(TestCase):
632
"""Tests for connected to remote server transports"""
634
def test_parse_url(self):
635
t = ConnectedTransport('http://simple.example.com/home/source')
636
self.assertEquals(t._host, 'simple.example.com')
637
self.assertEquals(t._port, None)
638
self.assertEquals(t._path, '/home/source/')
639
self.failUnless(t._user is None)
640
self.failUnless(t._password is None)
642
self.assertEquals(t.base, 'http://simple.example.com/home/source/')
644
def test_parse_url_with_at_in_user(self):
646
t = ConnectedTransport('ftp://user@host.com@www.host.com/')
647
self.assertEquals(t._user, 'user@host.com')
649
def test_parse_quoted_url(self):
650
t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
651
self.assertEquals(t._host, 'exAmple.com')
652
self.assertEquals(t._port, 2222)
653
self.assertEquals(t._user, 'robey')
654
self.assertEquals(t._password, 'h@t')
655
self.assertEquals(t._path, '/path/')
657
# Base should not keep track of the password
658
self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
660
def test_parse_invalid_url(self):
661
self.assertRaises(errors.InvalidURL,
663
'sftp://lily.org:~janneke/public/bzr/gub')
665
def test_relpath(self):
666
t = ConnectedTransport('sftp://user@host.com/abs/path')
668
self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
669
self.assertRaises(errors.PathNotChild, t.relpath,
670
'http://user@host.com/abs/path/sub')
671
self.assertRaises(errors.PathNotChild, t.relpath,
672
'sftp://user2@host.com/abs/path/sub')
673
self.assertRaises(errors.PathNotChild, t.relpath,
674
'sftp://user@otherhost.com/abs/path/sub')
675
self.assertRaises(errors.PathNotChild, t.relpath,
676
'sftp://user@host.com:33/abs/path/sub')
677
# Make sure it works when we don't supply a username
678
t = ConnectedTransport('sftp://host.com/abs/path')
679
self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
681
# Make sure it works when parts of the path will be url encoded
682
t = ConnectedTransport('sftp://host.com/dev/%path')
683
self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
685
def test_connection_sharing_propagate_credentials(self):
686
t = ConnectedTransport('ftp://user@host.com/abs/path')
687
self.assertEquals('user', t._user)
688
self.assertEquals('host.com', t._host)
689
self.assertIs(None, t._get_connection())
690
self.assertIs(None, t._password)
691
c = t.clone('subdir')
692
self.assertIs(None, c._get_connection())
693
self.assertIs(None, t._password)
695
# Simulate the user entering a password
697
connection = object()
698
t._set_connection(connection, password)
699
self.assertIs(connection, t._get_connection())
700
self.assertIs(password, t._get_credentials())
701
self.assertIs(connection, c._get_connection())
702
self.assertIs(password, c._get_credentials())
704
# credentials can be updated
705
new_password = 'even more secret'
706
c._update_credentials(new_password)
707
self.assertIs(connection, t._get_connection())
708
self.assertIs(new_password, t._get_credentials())
709
self.assertIs(connection, c._get_connection())
710
self.assertIs(new_password, c._get_credentials())
713
class TestReusedTransports(TestCase):
714
"""Tests for transport reuse"""
716
def test_reuse_same_transport(self):
717
possible_transports = []
718
t1 = get_transport('http://foo/',
719
possible_transports=possible_transports)
720
self.assertEqual([t1], possible_transports)
721
t2 = get_transport('http://foo/', possible_transports=[t1])
722
self.assertIs(t1, t2)
724
# Also check that final '/' are handled correctly
725
t3 = get_transport('http://foo/path/')
726
t4 = get_transport('http://foo/path', possible_transports=[t3])
727
self.assertIs(t3, t4)
729
t5 = get_transport('http://foo/path')
730
t6 = get_transport('http://foo/path/', possible_transports=[t5])
731
self.assertIs(t5, t6)
733
def test_don_t_reuse_different_transport(self):
734
t1 = get_transport('http://foo/path')
735
t2 = get_transport('http://bar/path', possible_transports=[t1])
736
self.assertIsNot(t1, t2)
739
class TestTransportTrace(TestCase):
742
transport = get_transport('trace+memory://')
743
self.assertIsInstance(
744
transport, bzrlib.transport.trace.TransportTraceDecorator)
746
def test_clone_preserves_activity(self):
747
transport = get_transport('trace+memory://')
748
transport2 = transport.clone('.')
749
self.assertTrue(transport is not transport2)
750
self.assertTrue(transport._activity is transport2._activity)
752
# the following specific tests are for the operations that have made use of
753
# logging in tests; we could test every single operation but doing that
754
# still won't cause a test failure when the top level Transport API
755
# changes; so there is little return doing that.
757
transport = get_transport('trace+memory:///')
758
transport.put_bytes('foo', 'barish')
761
# put_bytes records the bytes, not the content to avoid memory
763
expected_result.append(('put_bytes', 'foo', 6, None))
764
# get records the file name only.
765
expected_result.append(('get', 'foo'))
766
self.assertEqual(expected_result, transport._activity)
768
def test_readv(self):
769
transport = get_transport('trace+memory:///')
770
transport.put_bytes('foo', 'barish')
771
list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
774
# put_bytes records the bytes, not the content to avoid memory
776
expected_result.append(('put_bytes', 'foo', 6, None))
777
# readv records the supplied offset request
778
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
779
self.assertEqual(expected_result, transport._activity)