~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

Expanded the Transport test suite. Including delete, copy, move, etc. Updated SftpTransport to conform.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
import os
19
19
from cStringIO import StringIO
20
20
 
21
 
from bzrlib.errors import (NoSuchFile, FileExists, TransportNotPossible,
22
 
                           ConnectionError)
 
21
from bzrlib.errors import (NoSuchFile, FileExists,
 
22
                           TransportNotPossible, ConnectionError)
23
23
from bzrlib.tests import TestCase, TestCaseInTempDir
24
24
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
25
25
from bzrlib.transport import memory, urlescape
56
56
        """
57
57
        raise NotImplementedError
58
58
 
 
59
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
60
        """Many transport functions can return generators this makes sure
 
61
        to wrap them in a list() call to make sure the whole generator
 
62
        is run, and that the proper exception is raised.
 
63
        """
 
64
        try:
 
65
            list(func(*args, **kwargs))
 
66
        except excClass:
 
67
            return
 
68
        else:
 
69
            if hasattr(excClass,'__name__'): excName = excClass.__name__
 
70
            else: excName = str(excClass)
 
71
            raise self.failureException, "%s not raised" % excName
 
72
 
59
73
    def test_has(self):
60
74
        t = self.get_transport()
61
75
 
62
76
        files = ['a', 'b', 'e', 'g', '%']
63
77
        self.build_tree(files)
64
 
        self.assertEqual(t.has('a'), True)
65
 
        self.assertEqual(t.has('c'), False)
66
 
        self.assertEqual(t.has(urlescape('%')), True)
 
78
        self.assertEqual(True, t.has('a'))
 
79
        self.assertEqual(False, t.has('c'))
 
80
        self.assertEqual(True, t.has(urlescape('%')))
67
81
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
68
82
                [True, True, False, False, True, False, True, False])
69
 
        self.assertEqual(t.has_any(['a', 'b', 'c']), True)
70
 
        self.assertEqual(t.has_any(['c', 'd', 'f', urlescape('%%')]), False)
 
83
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
 
84
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlescape('%%')]))
71
85
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
72
86
                [True, True, False, False, True, False, True, False])
73
 
        self.assertEqual(t.has_any(['c', 'c', 'c']), False)
74
 
        self.assertEqual(t.has_any(['b', 'b', 'b']), True)
 
87
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
 
88
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
75
89
 
76
90
    def test_get(self):
77
91
        t = self.get_transport()
78
92
 
79
93
        files = ['a', 'b', 'e', 'g']
80
94
        self.build_tree(files)
81
 
        self.assertEqual(t.get('a').read(), open('a').read())
 
95
        self.assertEqual(open('a', 'rb').read(), t.get('a').read())
82
96
        content_f = t.get_multi(files)
83
97
        for path,f in zip(files, content_f):
84
98
            self.assertEqual(open(path).read(), f.read())
85
99
 
86
100
        content_f = t.get_multi(iter(files))
87
101
        for path,f in zip(files, content_f):
88
 
            self.assertEqual(open(path).read(), f.read())
 
102
            self.assertEqual(f.read(), open(path).read())
89
103
 
90
104
        self.assertRaises(NoSuchFile, t.get, 'c')
91
 
        try:
92
 
            files = list(t.get_multi(['a', 'b', 'c']))
93
 
        except NoSuchFile:
94
 
            pass
95
 
        else:
96
 
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
97
 
        try:
98
 
            files = list(t.get_multi(iter(['a', 'b', 'c', 'e'])))
99
 
        except NoSuchFile:
100
 
            pass
101
 
        else:
102
 
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
 
105
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
 
106
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
103
107
 
104
108
    def test_put(self):
105
109
        t = self.get_transport()
208
212
        self.check_file_contents('f5', 'here is some text\nand a bit more\n')
209
213
        self.check_file_contents('f6', 'some text for the\nthird file created\n')
210
214
 
211
 
 
212
 
 
213
215
    def test_mkdir(self):
214
216
        t = self.get_transport()
215
217
 
375
377
                'some\nmore\nfor\nb\n'
376
378
                'from an iterator\n')
377
379
 
 
380
        if self.readonly:
 
381
            _append('c', 'some text\nfor a missing file\n')
 
382
            _append('a', 'some text in a\n')
 
383
            _append('d', 'missing file r\n')
 
384
        else:
 
385
            t.append('c', 'some text\nfor a missing file\n')
 
386
            t.append_multi([('a', 'some text in a\n'),
 
387
                            ('d', 'missing file r\n')])
 
388
        self.check_file_contents('a', 
 
389
            'diff\ncontents for\na\n'
 
390
            'add\nsome\nmore\ncontents\n'
 
391
            'and\nthen\nsome\nmore\n'
 
392
            'a little bit more\n'
 
393
            'some text in a\n')
 
394
        self.check_file_contents('c', 'some text\nfor a missing file\n')
 
395
        self.check_file_contents('d', 'missing file r\n')
 
396
 
378
397
    def test_append_file(self):
379
398
        t = self.get_transport()
380
399
 
456
475
                'some text for the\nthird file created\n'
457
476
                'some garbage\nto put in three\n')
458
477
 
 
478
        a5 = open('f2', 'rb')
 
479
        a6 = open('f2', 'rb')
 
480
        a7 = open('f3', 'rb')
 
481
        if self.readonly:
 
482
            _append('c', a5.read())
 
483
            _append('a', a6.read())
 
484
            _append('d', a7.read())
 
485
        else:
 
486
            t.append('c', a5)
 
487
            t.append_multi([('a', a6), ('d', a7)])
 
488
        del a5, a6, a7
 
489
        self.check_file_contents('c', open('f2', 'rb').read())
 
490
        self.check_file_contents('d', open('f3', 'rb').read())
 
491
 
 
492
 
459
493
    def test_delete(self):
460
494
        # TODO: Test Transport.delete
461
495
        t = self.get_transport()
462
496
 
 
497
        # Not much to do with a readonly transport
 
498
        if self.readonly:
 
499
            return
 
500
 
 
501
        open('a', 'wb').write('a little bit of text\n')
 
502
        self.failUnless(t.has('a'))
 
503
        self.failUnlessExists('a')
 
504
        t.delete('a')
 
505
        self.failIf(os.path.lexists('a'))
 
506
 
 
507
        self.assertRaises(NoSuchFile, t.delete, 'a')
 
508
 
 
509
        open('a', 'wb').write('a text\n')
 
510
        open('b', 'wb').write('b text\n')
 
511
        open('c', 'wb').write('c text\n')
 
512
        self.assertEqual([True, True, True],
 
513
                list(t.has_multi(['a', 'b', 'c'])))
 
514
        t.delete_multi(['a', 'c'])
 
515
        self.assertEqual([False, True, False],
 
516
                list(t.has_multi(['a', 'b', 'c'])))
 
517
        self.failIf(os.path.lexists('a'))
 
518
        self.failUnlessExists('b')
 
519
        self.failIf(os.path.lexists('c'))
 
520
 
 
521
        self.assertRaises(NoSuchFile,
 
522
                t.delete_multi, ['a', 'b', 'c'])
 
523
 
 
524
        self.assertRaises(NoSuchFile,
 
525
                t.delete_multi, iter(['a', 'b', 'c']))
 
526
 
 
527
        open('a', 'wb').write('another a text\n')
 
528
        open('c', 'wb').write('another c text\n')
 
529
        t.delete_multi(iter(['a', 'b', 'c']))
 
530
 
 
531
        # We should have deleted everything
 
532
        # SftpServer creates control files in the
 
533
        # working directory, so we can just do a
 
534
        # plain "listdir".
 
535
        # self.assertEqual([], os.listdir('.'))
 
536
 
463
537
    def test_move(self):
464
 
        # TODO: Test Transport.move
465
538
        t = self.get_transport()
466
539
 
 
540
        if self.readonly:
 
541
            return
 
542
 
 
543
        # TODO: I would like to use os.listdir() to
 
544
        # make sure there are no extra files, but SftpServer
 
545
        # creates control files in the working directory
 
546
        # perhaps all of this could be done in a subdirectory
 
547
 
 
548
        open('a', 'wb').write('a first file\n')
 
549
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
550
 
 
551
        t.move('a', 'b')
 
552
        self.failUnlessExists('b')
 
553
        self.failIf(os.path.lexists('a'))
 
554
 
 
555
        self.check_file_contents('b', 'a first file\n')
 
556
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
557
 
 
558
        # Overwrite a file
 
559
        open('c', 'wb').write('c this file\n')
 
560
        t.move('c', 'b')
 
561
        self.failIf(os.path.lexists('c'))
 
562
        self.check_file_contents('b', 'c this file\n')
 
563
 
 
564
        # TODO: Try to write a test for atomicity
 
565
        # TODO: Test moving into a non-existant subdirectory
 
566
        # TODO: Test Transport.move_multi
 
567
 
467
568
    def test_copy(self):
468
 
        # TODO: Test Transport.move
469
569
        t = self.get_transport()
470
570
 
 
571
        if self.readonly:
 
572
            return
 
573
 
 
574
        open('a', 'wb').write('a file\n')
 
575
        t.copy('a', 'b')
 
576
        self.check_file_contents('b', 'a file\n')
 
577
 
 
578
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
 
579
        os.mkdir('c')
 
580
        # What should the assert be if you try to copy a
 
581
        # file over a directory?
 
582
        #self.assertRaises(Something, t.copy, 'a', 'c')
 
583
        open('d', 'wb').write('text in d\n')
 
584
        t.copy('d', 'b')
 
585
        self.check_file_contents('b', 'text in d\n')
 
586
 
 
587
        # TODO: test copy_multi
 
588
 
471
589
    def test_connection_error(self):
472
590
        """ConnectionError is raised when connection is impossible"""
473
591
        if not hasattr(self, "get_bogus_transport"):
482
600
        else:
483
601
            self.failIf(True, 'Did not get the expected exception.')
484
602
 
 
603
    def test_stat(self):
 
604
        # TODO: Test stat, just try once, and if it throws, stop testing
 
605
        from stat import S_ISDIR, S_ISREG
 
606
 
 
607
        t = self.get_transport()
 
608
 
 
609
        try:
 
610
            st = t.stat('.')
 
611
        except TransportNotPossible, e:
 
612
            # This transport cannot stat
 
613
            return
 
614
 
 
615
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
 
616
        self.build_tree(paths)
 
617
 
 
618
        local_stats = []
 
619
 
 
620
        for p in paths:
 
621
            st = t.stat(p)
 
622
            local_st = os.stat(p)
 
623
            if p.endswith('/'):
 
624
                self.failUnless(S_ISDIR(st.st_mode))
 
625
            else:
 
626
                self.failUnless(S_ISREG(st.st_mode))
 
627
            self.assertEqual(local_st.st_size, st.st_size)
 
628
            self.assertEqual(local_st.st_mode, st.st_mode)
 
629
            local_stats.append(local_st)
 
630
 
 
631
        remote_stats = list(t.stat_multi(paths))
 
632
        remote_iter_stats = list(t.stat_multi(iter(paths)))
 
633
 
 
634
        for local, remote, remote_iter in \
 
635
            zip(local_stats, remote_stats, remote_iter_stats):
 
636
            self.assertEqual(local.st_mode, remote.st_mode)
 
637
            self.assertEqual(local.st_mode, remote_iter.st_mode)
 
638
 
 
639
            self.assertEqual(local.st_size, remote.st_size)
 
640
            self.assertEqual(local.st_size, remote_iter.st_size)
 
641
            # Should we test UID/GID?
 
642
 
 
643
        self.assertRaises(NoSuchFile, t.stat, 'q')
 
644
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
 
645
 
 
646
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
 
647
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
 
648
 
 
649
    def test_list_dir(self):
 
650
        # TODO: Test list_dir, just try once, and if it throws, stop testing
 
651
        t = self.get_transport()
 
652
        
 
653
        if not t.listable():
 
654
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
 
655
            return
 
656
 
 
657
        # SftpServer creates control files in the working directory
 
658
        # so lets move down a directory to be safe
 
659
        os.mkdir('wd')
 
660
        os.chdir('wd')
 
661
        t = t.clone('wd')
 
662
 
 
663
        self.assertEqual([], list(t.list_dir('.')))
 
664
        self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e'])
 
665
 
 
666
        self.assertEqual(['a', 'b', 'c'], list(t.list_dir('.')))
 
667
        self.assertEqual(['d', 'e'], list(t.list_dir('c')))
 
668
 
 
669
        os.remove('c/d')
 
670
        os.remove('b')
 
671
        self.assertEqual(['a', 'c'], list(t.list_dir('.')))
 
672
        self.assertEqual(['e'], list(t.list_dir('c')))
 
673
 
 
674
        self.assertListRaises(NoSuchFile, t.list_dir, 'q')
 
675
        self.assertListRaises(NoSuchFile, t.list_dir, 'c/f')
 
676
        self.assertListRaises(NoSuchFile, t.list_dir, 'a')
 
677
 
 
678
    def test_clone(self):
 
679
        # TODO: Test that clone moves up and down the filesystem
 
680
        t1 = self.get_transport()
 
681
 
 
682
        self.build_tree(['a', 'b/', 'b/c'])
 
683
 
 
684
        self.failUnless(t1.has('a'))
 
685
        self.failUnless(t1.has('b/c'))
 
686
        self.failIf(t1.has('c'))
 
687
 
 
688
        t2 = t1.clone('b')
 
689
        self.failUnless(t2.has('c'))
 
690
        self.failIf(t2.has('a'))
 
691
 
 
692
        t3 = t2.clone('..')
 
693
        self.failUnless(t3.has('a'))
 
694
        self.failIf(t3.has('c'))
 
695
 
 
696
        self.failIf(t1.has('b/d'))
 
697
        self.failIf(t2.has('d'))
 
698
        self.failIf(t3.has('b/d'))
 
699
 
 
700
        if self.readonly:
 
701
            open('b/d', 'wb').write('newfile\n')
 
702
        else:
 
703
            t2.put('d', 'newfile\n')
 
704
 
 
705
        self.failUnless(t1.has('b/d'))
 
706
        self.failUnless(t2.has('d'))
 
707
        self.failUnless(t3.has('b/d'))
 
708
 
485
709
        
486
710
class LocalTransportTest(TestCaseInTempDir, TestTransportMixIn):
487
711
    def get_transport(self):