~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport_implementations.py

Merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
import sys
27
27
 
28
28
from bzrlib.errors import (NoSuchFile, FileExists,
 
29
                           LockError,
29
30
                           TransportNotPossible, ConnectionError)
30
31
from bzrlib.tests import TestCaseInTempDir, TestSkipped
31
32
from bzrlib.transport import memory, urlescape
257
258
        self.assertTransportMode(t, 'mdmode755', 0755)
258
259
 
259
260
    def test_copy_to(self):
 
261
        # FIXME: test:   same server to same server (partly done)
 
262
        # same protocol two servers
 
263
        # and    different protocols (done for now except for MemoryTransport.
 
264
        # - RBC 20060122
260
265
        from bzrlib.transport.memory import MemoryTransport
 
266
 
 
267
        def simple_copy_files(transport_from, transport_to):
 
268
            files = ['a', 'b', 'c', 'd']
 
269
            self.build_tree(files, transport=transport_from)
 
270
            transport_from.copy_to(files, transport_to)
 
271
            for f in files:
 
272
                self.check_transport_contents(transport_to.get(f).read(),
 
273
                                              transport_from, f)
 
274
 
261
275
        t = self.get_transport()
262
 
 
263
 
        files = ['a', 'b', 'c', 'd']
264
 
        self.build_tree(files, transport=t)
265
 
 
266
276
        temp_transport = MemoryTransport('memory:/')
 
277
        simple_copy_files(t, temp_transport)
 
278
        if not t.is_readonly():
 
279
            t.mkdir('copy_to_simple')
 
280
            t2 = t.clone('copy_to_simple')
 
281
            simple_copy_files(t, t2)
267
282
 
268
 
        t.copy_to(files, temp_transport)
269
 
        for f in files:
270
 
            self.check_transport_contents(temp_transport.get(f).read(),
271
 
                                          t, f)
272
283
 
273
284
        # Test that copying into a missing directory raises
274
285
        # NoSuchFile
484
495
        self.check_transport_contents(t.get('f2').read(), t, 'c')
485
496
        self.check_transport_contents(t.get('f3').read(), t, 'd')
486
497
 
487
 
 
488
498
    def test_delete(self):
489
499
        # TODO: Test Transport.delete
490
500
        t = self.get_transport()
491
501
 
492
502
        # Not much to do with a readonly transport
493
503
        if t.is_readonly():
 
504
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
494
505
            return
495
506
 
496
507
        t.put('a', StringIO('a little bit of text\n'))
528
539
        # plain "listdir".
529
540
        # self.assertEqual([], os.listdir('.'))
530
541
 
 
542
    def test_rmdir(self):
 
543
        t = self.get_transport()
 
544
        # Not much to do with a readonly transport
 
545
        if t.is_readonly():
 
546
            self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
 
547
            return
 
548
        t.mkdir('adir')
 
549
        t.mkdir('adir/bdir')
 
550
        t.rmdir('adir/bdir')
 
551
        self.assertRaises(NoSuchFile, t.stat, 'adir/bdir')
 
552
        t.rmdir('adir')
 
553
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
554
 
 
555
    def test_delete_tree(self):
 
556
        t = self.get_transport()
 
557
 
 
558
        # Not much to do with a readonly transport
 
559
        if t.is_readonly():
 
560
            self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
 
561
            return
 
562
 
 
563
        # and does it like listing ?
 
564
        t.mkdir('adir')
 
565
        try:
 
566
            t.delete_tree('adir')
 
567
        except TransportNotPossible:
 
568
            # ok, this transport does not support delete_tree
 
569
            return
 
570
        
 
571
        # did it delete that trivial case?
 
572
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
573
 
 
574
        self.build_tree(['adir/',
 
575
                         'adir/file', 
 
576
                         'adir/subdir/', 
 
577
                         'adir/subdir/file', 
 
578
                         'adir/subdir2/',
 
579
                         'adir/subdir2/file',
 
580
                         ], transport=t)
 
581
 
 
582
        t.delete_tree('adir')
 
583
        # adir should be gone now.
 
584
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
585
 
531
586
    def test_move(self):
532
587
        t = self.get_transport()
533
588
 
630
685
 
631
686
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
632
687
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
 
688
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
 
689
        subdir = t.clone('subdir')
 
690
        subdir.stat('./file')
 
691
        subdir.stat('.')
633
692
 
634
693
    def test_list_dir(self):
635
694
        # TODO: Test list_dir, just try once, and if it throws, stop testing
646
705
 
647
706
        # SftpServer creates control files in the working directory
648
707
        # so lets move down a directory to avoid those.
649
 
        t.mkdir('wd')
 
708
        if not t.is_readonly():
 
709
            t.mkdir('wd')
 
710
        else:
 
711
            os.mkdir('wd')
650
712
        t = t.clone('wd')
651
713
 
652
714
        self.assertEqual([], sorted_list(u'.'))
653
 
        self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e'], transport=t)
 
715
        # c2 is precisely one letter longer than c here to test that
 
716
        # suffixing is not confused.
 
717
        if not t.is_readonly():
 
718
            self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e', 'c2/'], transport=t)
 
719
        else:
 
720
            self.build_tree(['wd/a', 'wd/b', 'wd/c/', 'wd/c/d', 'wd/c/e', 'wd/c2/'])
654
721
 
655
 
        self.assertEqual([u'a', u'b', u'c'], sorted_list(u'.'))
 
722
        self.assertEqual([u'a', u'b', u'c', u'c2'], sorted_list(u'.'))
656
723
        self.assertEqual([u'd', u'e'], sorted_list(u'c'))
657
724
 
658
 
        t.delete('c/d')
659
 
        t.delete('b')
660
 
        self.assertEqual([u'a', u'c'], sorted_list('.'))
 
725
        if not t.is_readonly():
 
726
            t.delete('c/d')
 
727
            t.delete('b')
 
728
        else:
 
729
            os.unlink('wd/c/d')
 
730
            os.unlink('wd/b')
 
731
            
 
732
        self.assertEqual([u'a', u'c', u'c2'], sorted_list('.'))
661
733
        self.assertEqual([u'e'], sorted_list(u'c'))
662
734
 
663
735
        self.assertListRaises(NoSuchFile, t.list_dir, 'q')
730
802
        transport = transport.clone('isolated')
731
803
        paths = set(transport.iter_files_recursive())
732
804
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
805
 
 
806
    def test_connect_twice_is_same_content(self):
 
807
        # check that our server (whatever it is) is accessable reliably
 
808
        # via get_transport and multiple connections share content.
 
809
        transport = self.get_transport()
 
810
        if transport.is_readonly():
 
811
            return
 
812
        transport.put('foo', StringIO('bar'))
 
813
        transport2 = self.get_transport()
 
814
        self.check_transport_contents('bar', transport2, 'foo')
 
815
        # its base should be usable.
 
816
        transport2 = bzrlib.transport.get_transport(transport.base)
 
817
        self.check_transport_contents('bar', transport2, 'foo')
 
818
 
 
819
        # now opening at a relative url should give use a sane result:
 
820
        transport.mkdir('newdir')
 
821
        transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
 
822
        transport2 = transport2.clone('..')
 
823
        self.check_transport_contents('bar', transport2, 'foo')
 
824
 
 
825
    def test_lock_write(self):
 
826
        transport = self.get_transport()
 
827
        if transport.is_readonly():
 
828
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
 
829
            return
 
830
        transport.put('lock', StringIO())
 
831
        lock = transport.lock_write('lock')
 
832
        # TODO make this consistent on all platforms:
 
833
        # self.assertRaises(LockError, transport.lock_write, 'lock')
 
834
        lock.unlock()
 
835
 
 
836
    def test_lock_read(self):
 
837
        transport = self.get_transport()
 
838
        if transport.is_readonly():
 
839
            file('lock', 'w').close()
 
840
        else:
 
841
            transport.put('lock', StringIO())
 
842
        lock = transport.lock_read('lock')
 
843
        # TODO make this consistent on all platforms:
 
844
        # self.assertRaises(LockError, transport.lock_read, 'lock')
 
845
        lock.unlock()