28
28
from bzrlib.errors import (NoSuchFile, FileExists,
29
30
TransportNotPossible, ConnectionError)
30
31
from bzrlib.tests import TestCaseInTempDir, TestSkipped
31
32
from bzrlib.transport import memory, urlescape
257
258
self.assertTransportMode(t, 'mdmode755', 0755)
259
260
def test_copy_to(self):
261
# FIXME: test: same server to same server (partly done)
262
# same protocol two servers
263
# and different protocols (done for now except for MemoryTransport.
260
265
from bzrlib.transport.memory import MemoryTransport
267
def simple_copy_files(transport_from, transport_to):
268
files = ['a', 'b', 'c', 'd']
269
self.build_tree(files, transport=transport_from)
270
transport_from.copy_to(files, transport_to)
272
self.check_transport_contents(transport_to.get(f).read(),
261
275
t = self.get_transport()
263
files = ['a', 'b', 'c', 'd']
264
self.build_tree(files, transport=t)
266
276
temp_transport = MemoryTransport('memory:/')
277
simple_copy_files(t, temp_transport)
278
if not t.is_readonly():
279
t.mkdir('copy_to_simple')
280
t2 = t.clone('copy_to_simple')
281
simple_copy_files(t, t2)
268
t.copy_to(files, temp_transport)
270
self.check_transport_contents(temp_transport.get(f).read(),
273
284
# Test that copying into a missing directory raises
484
495
self.check_transport_contents(t.get('f2').read(), t, 'c')
485
496
self.check_transport_contents(t.get('f3').read(), t, 'd')
488
498
def test_delete(self):
489
499
# TODO: Test Transport.delete
490
500
t = self.get_transport()
492
502
# Not much to do with a readonly transport
493
503
if t.is_readonly():
504
self.assertRaises(TransportNotPossible, t.delete, 'missing')
496
507
t.put('a', StringIO('a little bit of text\n'))
528
539
# plain "listdir".
529
540
# self.assertEqual([], os.listdir('.'))
542
def test_rmdir(self):
543
t = self.get_transport()
544
# Not much to do with a readonly transport
546
self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
551
self.assertRaises(NoSuchFile, t.stat, 'adir/bdir')
553
self.assertRaises(NoSuchFile, t.stat, 'adir')
555
def test_delete_tree(self):
556
t = self.get_transport()
558
# Not much to do with a readonly transport
560
self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
563
# and does it like listing ?
566
t.delete_tree('adir')
567
except TransportNotPossible:
568
# ok, this transport does not support delete_tree
571
# did it delete that trivial case?
572
self.assertRaises(NoSuchFile, t.stat, 'adir')
574
self.build_tree(['adir/',
582
t.delete_tree('adir')
583
# adir should be gone now.
584
self.assertRaises(NoSuchFile, t.stat, 'adir')
531
586
def test_move(self):
532
587
t = self.get_transport()
631
686
self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
632
687
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
688
self.build_tree(['subdir/', 'subdir/file'], transport=t)
689
subdir = t.clone('subdir')
690
subdir.stat('./file')
634
693
def test_list_dir(self):
635
694
# TODO: Test list_dir, just try once, and if it throws, stop testing
647
706
# SftpServer creates control files in the working directory
648
707
# so lets move down a directory to avoid those.
708
if not t.is_readonly():
650
712
t = t.clone('wd')
652
714
self.assertEqual([], sorted_list(u'.'))
653
self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e'], transport=t)
715
# c2 is precisely one letter longer than c here to test that
716
# suffixing is not confused.
717
if not t.is_readonly():
718
self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e', 'c2/'], transport=t)
720
self.build_tree(['wd/a', 'wd/b', 'wd/c/', 'wd/c/d', 'wd/c/e', 'wd/c2/'])
655
self.assertEqual([u'a', u'b', u'c'], sorted_list(u'.'))
722
self.assertEqual([u'a', u'b', u'c', u'c2'], sorted_list(u'.'))
656
723
self.assertEqual([u'd', u'e'], sorted_list(u'c'))
660
self.assertEqual([u'a', u'c'], sorted_list('.'))
725
if not t.is_readonly():
732
self.assertEqual([u'a', u'c', u'c2'], sorted_list('.'))
661
733
self.assertEqual([u'e'], sorted_list(u'c'))
663
735
self.assertListRaises(NoSuchFile, t.list_dir, 'q')
730
802
transport = transport.clone('isolated')
731
803
paths = set(transport.iter_files_recursive())
732
804
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
806
def test_connect_twice_is_same_content(self):
807
# check that our server (whatever it is) is accessable reliably
808
# via get_transport and multiple connections share content.
809
transport = self.get_transport()
810
if transport.is_readonly():
812
transport.put('foo', StringIO('bar'))
813
transport2 = self.get_transport()
814
self.check_transport_contents('bar', transport2, 'foo')
815
# its base should be usable.
816
transport2 = bzrlib.transport.get_transport(transport.base)
817
self.check_transport_contents('bar', transport2, 'foo')
819
# now opening at a relative url should give use a sane result:
820
transport.mkdir('newdir')
821
transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
822
transport2 = transport2.clone('..')
823
self.check_transport_contents('bar', transport2, 'foo')
825
def test_lock_write(self):
826
transport = self.get_transport()
827
if transport.is_readonly():
828
self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
830
transport.put('lock', StringIO())
831
lock = transport.lock_write('lock')
832
# TODO make this consistent on all platforms:
833
# self.assertRaises(LockError, transport.lock_write, 'lock')
836
def test_lock_read(self):
837
transport = self.get_transport()
838
if transport.is_readonly():
839
file('lock', 'w').close()
841
transport.put('lock', StringIO())
842
lock = transport.lock_read('lock')
843
# TODO make this consistent on all platforms:
844
# self.assertRaises(LockError, transport.lock_read, 'lock')