~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

[merge] bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
import os
19
19
from cStringIO import StringIO
20
20
 
21
 
from bzrlib.errors import (NoSuchFile, FileExists, TransportNotPossible,
22
 
                           ConnectionError)
 
21
from bzrlib.errors import (NoSuchFile, FileExists,
 
22
                           TransportNotPossible, ConnectionError)
23
23
from bzrlib.tests import TestCase, TestCaseInTempDir
24
24
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
25
25
from bzrlib.transport import memory, urlescape
57
57
        """
58
58
        raise NotImplementedError
59
59
 
 
60
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
61
        """Many transport functions can return generators this makes sure
 
62
        to wrap them in a list() call to make sure the whole generator
 
63
        is run, and that the proper exception is raised.
 
64
        """
 
65
        try:
 
66
            list(func(*args, **kwargs))
 
67
        except excClass:
 
68
            return
 
69
        else:
 
70
            if hasattr(excClass,'__name__'): excName = excClass.__name__
 
71
            else: excName = str(excClass)
 
72
            raise self.failureException, "%s not raised" % excName
 
73
 
60
74
    def test_has(self):
61
75
        t = self.get_transport()
62
76
 
63
77
        files = ['a', 'b', 'e', 'g', '%']
64
78
        self.build_tree(files)
65
 
        self.assertEqual(t.has('a'), True)
66
 
        self.assertEqual(t.has('c'), False)
67
 
        self.assertEqual(t.has(urlescape('%')), True)
 
79
        self.assertEqual(True, t.has('a'))
 
80
        self.assertEqual(False, t.has('c'))
 
81
        self.assertEqual(True, t.has(urlescape('%')))
68
82
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
69
83
                [True, True, False, False, True, False, True, False])
70
 
        self.assertEqual(t.has_any(['a', 'b', 'c']), True)
71
 
        self.assertEqual(t.has_any(['c', 'd', 'f', urlescape('%%')]), False)
 
84
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
 
85
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlescape('%%')]))
72
86
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
73
87
                [True, True, False, False, True, False, True, False])
74
 
        self.assertEqual(t.has_any(['c', 'c', 'c']), False)
75
 
        self.assertEqual(t.has_any(['b', 'b', 'b']), True)
 
88
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
 
89
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
76
90
 
77
91
    def test_get(self):
78
92
        t = self.get_transport()
79
93
 
80
94
        files = ['a', 'b', 'e', 'g']
81
95
        self.build_tree(files)
82
 
        self.assertEqual(t.get('a').read(), open('a', 'rb').read())
 
96
        self.assertEqual(open('a', 'rb').read(), t.get('a').read())
83
97
        content_f = t.get_multi(files)
84
98
        for path,f in zip(files, content_f):
85
99
            self.assertEqual(f.read(), open(path, 'rb').read())
89
103
            self.assertEqual(f.read(), open(path, 'rb').read())
90
104
 
91
105
        self.assertRaises(NoSuchFile, t.get, 'c')
92
 
        try:
93
 
            files = list(t.get_multi(['a', 'b', 'c']))
94
 
        except NoSuchFile:
95
 
            pass
96
 
        else:
97
 
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
98
 
        try:
99
 
            files = list(t.get_multi(iter(['a', 'b', 'c', 'e'])))
100
 
        except NoSuchFile:
101
 
            pass
102
 
        else:
103
 
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
 
106
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
 
107
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
104
108
 
105
109
    def test_put(self):
106
110
        t = self.get_transport()
209
213
        self.check_file_contents('f5', 'here is some text\nand a bit more\n')
210
214
        self.check_file_contents('f6', 'some text for the\nthird file created\n')
211
215
 
212
 
 
213
 
 
214
216
    def test_mkdir(self):
215
217
        t = self.get_transport()
216
218
 
376
378
                'some\nmore\nfor\nb\n'
377
379
                'from an iterator\n')
378
380
 
 
381
        if self.readonly:
 
382
            _append('c', 'some text\nfor a missing file\n')
 
383
            _append('a', 'some text in a\n')
 
384
            _append('d', 'missing file r\n')
 
385
        else:
 
386
            t.append('c', 'some text\nfor a missing file\n')
 
387
            t.append_multi([('a', 'some text in a\n'),
 
388
                            ('d', 'missing file r\n')])
 
389
        self.check_file_contents('a', 
 
390
            'diff\ncontents for\na\n'
 
391
            'add\nsome\nmore\ncontents\n'
 
392
            'and\nthen\nsome\nmore\n'
 
393
            'a little bit more\n'
 
394
            'some text in a\n')
 
395
        self.check_file_contents('c', 'some text\nfor a missing file\n')
 
396
        self.check_file_contents('d', 'missing file r\n')
 
397
 
379
398
    def test_append_file(self):
380
399
        t = self.get_transport()
381
400
 
457
476
                'some text for the\nthird file created\n'
458
477
                'some garbage\nto put in three\n')
459
478
 
 
479
        a5 = open('f2', 'rb')
 
480
        a6 = open('f2', 'rb')
 
481
        a7 = open('f3', 'rb')
 
482
        if self.readonly:
 
483
            _append('c', a5.read())
 
484
            _append('a', a6.read())
 
485
            _append('d', a7.read())
 
486
        else:
 
487
            t.append('c', a5)
 
488
            t.append_multi([('a', a6), ('d', a7)])
 
489
        del a5, a6, a7
 
490
        self.check_file_contents('c', open('f2', 'rb').read())
 
491
        self.check_file_contents('d', open('f3', 'rb').read())
 
492
 
 
493
 
460
494
    def test_delete(self):
461
495
        # TODO: Test Transport.delete
462
496
        t = self.get_transport()
463
497
 
 
498
        # Not much to do with a readonly transport
 
499
        if self.readonly:
 
500
            return
 
501
 
 
502
        open('a', 'wb').write('a little bit of text\n')
 
503
        self.failUnless(t.has('a'))
 
504
        self.failUnlessExists('a')
 
505
        t.delete('a')
 
506
        self.failIf(os.path.lexists('a'))
 
507
 
 
508
        self.assertRaises(NoSuchFile, t.delete, 'a')
 
509
 
 
510
        open('a', 'wb').write('a text\n')
 
511
        open('b', 'wb').write('b text\n')
 
512
        open('c', 'wb').write('c text\n')
 
513
        self.assertEqual([True, True, True],
 
514
                list(t.has_multi(['a', 'b', 'c'])))
 
515
        t.delete_multi(['a', 'c'])
 
516
        self.assertEqual([False, True, False],
 
517
                list(t.has_multi(['a', 'b', 'c'])))
 
518
        self.failIf(os.path.lexists('a'))
 
519
        self.failUnlessExists('b')
 
520
        self.failIf(os.path.lexists('c'))
 
521
 
 
522
        self.assertRaises(NoSuchFile,
 
523
                t.delete_multi, ['a', 'b', 'c'])
 
524
 
 
525
        self.assertRaises(NoSuchFile,
 
526
                t.delete_multi, iter(['a', 'b', 'c']))
 
527
 
 
528
        open('a', 'wb').write('another a text\n')
 
529
        open('c', 'wb').write('another c text\n')
 
530
        t.delete_multi(iter(['a', 'b', 'c']))
 
531
 
 
532
        # We should have deleted everything
 
533
        # SftpServer creates control files in the
 
534
        # working directory, so we can just do a
 
535
        # plain "listdir".
 
536
        # self.assertEqual([], os.listdir('.'))
 
537
 
464
538
    def test_move(self):
465
 
        # TODO: Test Transport.move
466
539
        t = self.get_transport()
467
540
 
 
541
        if self.readonly:
 
542
            return
 
543
 
 
544
        # TODO: I would like to use os.listdir() to
 
545
        # make sure there are no extra files, but SftpServer
 
546
        # creates control files in the working directory
 
547
        # perhaps all of this could be done in a subdirectory
 
548
 
 
549
        open('a', 'wb').write('a first file\n')
 
550
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
551
 
 
552
        t.move('a', 'b')
 
553
        self.failUnlessExists('b')
 
554
        self.failIf(os.path.lexists('a'))
 
555
 
 
556
        self.check_file_contents('b', 'a first file\n')
 
557
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
558
 
 
559
        # Overwrite a file
 
560
        open('c', 'wb').write('c this file\n')
 
561
        t.move('c', 'b')
 
562
        self.failIf(os.path.lexists('c'))
 
563
        self.check_file_contents('b', 'c this file\n')
 
564
 
 
565
        # TODO: Try to write a test for atomicity
 
566
        # TODO: Test moving into a non-existant subdirectory
 
567
        # TODO: Test Transport.move_multi
 
568
 
468
569
    def test_copy(self):
469
 
        # TODO: Test Transport.move
470
570
        t = self.get_transport()
471
571
 
 
572
        if self.readonly:
 
573
            return
 
574
 
 
575
        open('a', 'wb').write('a file\n')
 
576
        t.copy('a', 'b')
 
577
        self.check_file_contents('b', 'a file\n')
 
578
 
 
579
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
 
580
        os.mkdir('c')
 
581
        # What should the assert be if you try to copy a
 
582
        # file over a directory?
 
583
        #self.assertRaises(Something, t.copy, 'a', 'c')
 
584
        open('d', 'wb').write('text in d\n')
 
585
        t.copy('d', 'b')
 
586
        self.check_file_contents('b', 'text in d\n')
 
587
 
 
588
        # TODO: test copy_multi
 
589
 
472
590
    def test_connection_error(self):
473
591
        """ConnectionError is raised when connection is impossible"""
474
592
        if not hasattr(self, "get_bogus_transport"):
483
601
        else:
484
602
            self.failIf(True, 'Did not get the expected exception.')
485
603
 
 
604
    def test_stat(self):
 
605
        # TODO: Test stat, just try once, and if it throws, stop testing
 
606
        from stat import S_ISDIR, S_ISREG
 
607
 
 
608
        t = self.get_transport()
 
609
 
 
610
        try:
 
611
            st = t.stat('.')
 
612
        except TransportNotPossible, e:
 
613
            # This transport cannot stat
 
614
            return
 
615
 
 
616
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
 
617
        self.build_tree(paths)
 
618
 
 
619
        local_stats = []
 
620
 
 
621
        for p in paths:
 
622
            st = t.stat(p)
 
623
            local_st = os.stat(p)
 
624
            if p.endswith('/'):
 
625
                self.failUnless(S_ISDIR(st.st_mode))
 
626
            else:
 
627
                self.failUnless(S_ISREG(st.st_mode))
 
628
            self.assertEqual(local_st.st_size, st.st_size)
 
629
            self.assertEqual(local_st.st_mode, st.st_mode)
 
630
            local_stats.append(local_st)
 
631
 
 
632
        remote_stats = list(t.stat_multi(paths))
 
633
        remote_iter_stats = list(t.stat_multi(iter(paths)))
 
634
 
 
635
        for local, remote, remote_iter in \
 
636
            zip(local_stats, remote_stats, remote_iter_stats):
 
637
            self.assertEqual(local.st_mode, remote.st_mode)
 
638
            self.assertEqual(local.st_mode, remote_iter.st_mode)
 
639
 
 
640
            self.assertEqual(local.st_size, remote.st_size)
 
641
            self.assertEqual(local.st_size, remote_iter.st_size)
 
642
            # Should we test UID/GID?
 
643
 
 
644
        self.assertRaises(NoSuchFile, t.stat, 'q')
 
645
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
 
646
 
 
647
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
 
648
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
 
649
 
 
650
    def test_list_dir(self):
 
651
        # TODO: Test list_dir, just try once, and if it throws, stop testing
 
652
        t = self.get_transport()
 
653
        
 
654
        if not t.listable():
 
655
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
 
656
            return
 
657
 
 
658
        def sorted_list(d):
 
659
            l = list(t.list_dir(d))
 
660
            l.sort()
 
661
            return l
 
662
 
 
663
        # SftpServer creates control files in the working directory
 
664
        # so lets move down a directory to be safe
 
665
        os.mkdir('wd')
 
666
        os.chdir('wd')
 
667
        t = t.clone('wd')
 
668
 
 
669
        self.assertEqual([], sorted_list(u'.'))
 
670
        self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e'])
 
671
 
 
672
        self.assertEqual([u'a', u'b', u'c'], sorted_list(u'.'))
 
673
        self.assertEqual([u'd', u'e'], sorted_list(u'c'))
 
674
 
 
675
        os.remove('c/d')
 
676
        os.remove('b')
 
677
        self.assertEqual([u'a', u'c'], sorted_list('.'))
 
678
        self.assertEqual([u'e'], sorted_list(u'c'))
 
679
 
 
680
        self.assertListRaises(NoSuchFile, t.list_dir, 'q')
 
681
        self.assertListRaises(NoSuchFile, t.list_dir, 'c/f')
 
682
        self.assertListRaises(NoSuchFile, t.list_dir, 'a')
 
683
 
 
684
    def test_clone(self):
 
685
        # TODO: Test that clone moves up and down the filesystem
 
686
        t1 = self.get_transport()
 
687
 
 
688
        self.build_tree(['a', 'b/', 'b/c'])
 
689
 
 
690
        self.failUnless(t1.has('a'))
 
691
        self.failUnless(t1.has('b/c'))
 
692
        self.failIf(t1.has('c'))
 
693
 
 
694
        t2 = t1.clone('b')
 
695
        self.failUnless(t2.has('c'))
 
696
        self.failIf(t2.has('a'))
 
697
 
 
698
        t3 = t2.clone('..')
 
699
        self.failUnless(t3.has('a'))
 
700
        self.failIf(t3.has('c'))
 
701
 
 
702
        self.failIf(t1.has('b/d'))
 
703
        self.failIf(t2.has('d'))
 
704
        self.failIf(t3.has('b/d'))
 
705
 
 
706
        if self.readonly:
 
707
            open('b/d', 'wb').write('newfile\n')
 
708
        else:
 
709
            t2.put('d', 'newfile\n')
 
710
 
 
711
        self.failUnless(t1.has('b/d'))
 
712
        self.failUnless(t2.has('d'))
 
713
        self.failUnless(t3.has('b/d'))
 
714
 
486
715
        
487
716
class LocalTransportTest(TestCaseInTempDir, TestTransportMixIn):
488
717
    def get_transport(self):