~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

[merge] bzr.dev 1491

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
import os
19
19
from cStringIO import StringIO
20
20
 
21
 
from bzrlib.errors import (NoSuchFile, FileExists, TransportNotPossible,
22
 
                           ConnectionError)
 
21
from bzrlib.errors import (NoSuchFile, FileExists,
 
22
                           TransportNotPossible, ConnectionError)
23
23
from bzrlib.tests import TestCase, TestCaseInTempDir
24
24
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
25
25
from bzrlib.transport import memory, urlescape
56
56
        """
57
57
        raise NotImplementedError
58
58
 
 
59
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
60
        """Many transport functions can return generators this makes sure
 
61
        to wrap them in a list() call to make sure the whole generator
 
62
        is run, and that the proper exception is raised.
 
63
        """
 
64
        try:
 
65
            list(func(*args, **kwargs))
 
66
        except excClass:
 
67
            return
 
68
        else:
 
69
            if hasattr(excClass,'__name__'): excName = excClass.__name__
 
70
            else: excName = str(excClass)
 
71
            raise self.failureException, "%s not raised" % excName
 
72
 
59
73
    def test_has(self):
60
74
        t = self.get_transport()
61
75
 
62
76
        files = ['a', 'b', 'e', 'g', '%']
63
77
        self.build_tree(files)
64
 
        self.assertEqual(t.has('a'), True)
65
 
        self.assertEqual(t.has('c'), False)
66
 
        self.assertEqual(t.has(urlescape('%')), True)
 
78
        self.assertEqual(True, t.has('a'))
 
79
        self.assertEqual(False, t.has('c'))
 
80
        self.assertEqual(True, t.has(urlescape('%')))
67
81
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
68
82
                [True, True, False, False, True, False, True, False])
69
 
        self.assertEqual(t.has_any(['a', 'b', 'c']), True)
70
 
        self.assertEqual(t.has_any(['c', 'd', 'f', urlescape('%%')]), False)
 
83
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
 
84
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlescape('%%')]))
71
85
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
72
86
                [True, True, False, False, True, False, True, False])
73
 
        self.assertEqual(t.has_any(['c', 'c', 'c']), False)
74
 
        self.assertEqual(t.has_any(['b', 'b', 'b']), True)
 
87
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
 
88
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
75
89
 
76
90
    def test_get(self):
77
91
        t = self.get_transport()
78
92
 
79
93
        files = ['a', 'b', 'e', 'g']
80
94
        self.build_tree(files)
81
 
        self.assertEqual(t.get('a').read(), open('a').read())
 
95
        self.assertEqual(open('a', 'rb').read(), t.get('a').read())
82
96
        content_f = t.get_multi(files)
83
97
        for path,f in zip(files, content_f):
84
98
            self.assertEqual(open(path).read(), f.read())
85
99
 
86
100
        content_f = t.get_multi(iter(files))
87
101
        for path,f in zip(files, content_f):
88
 
            self.assertEqual(open(path).read(), f.read())
 
102
            self.assertEqual(f.read(), open(path).read())
89
103
 
90
104
        self.assertRaises(NoSuchFile, t.get, 'c')
91
 
        try:
92
 
            files = list(t.get_multi(['a', 'b', 'c']))
93
 
        except NoSuchFile:
94
 
            pass
95
 
        else:
96
 
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
97
 
        try:
98
 
            files = list(t.get_multi(iter(['a', 'b', 'c', 'e'])))
99
 
        except NoSuchFile:
100
 
            pass
101
 
        else:
102
 
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
 
105
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
 
106
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
103
107
 
104
108
    def test_put(self):
105
109
        t = self.get_transport()
208
212
        self.check_file_contents('f5', 'here is some text\nand a bit more\n')
209
213
        self.check_file_contents('f6', 'some text for the\nthird file created\n')
210
214
 
211
 
 
212
 
 
213
215
    def test_mkdir(self):
214
216
        t = self.get_transport()
215
217
 
287
289
        files = ['a', 'b', 'c', 'd']
288
290
        self.build_tree(files)
289
291
 
290
 
        dtmp = tempfile.mkdtemp(dir='.', prefix='test-transport-')
 
292
        dtmp = tempfile.mkdtemp(dir=u'.', prefix='test-transport-')
291
293
        dtmp_base = os.path.basename(dtmp)
292
294
        local_t = LocalTransport(dtmp)
293
295
 
307
309
 
308
310
        del dtmp, dtmp_base, local_t
309
311
 
310
 
        dtmp = tempfile.mkdtemp(dir='.', prefix='test-transport-')
 
312
        dtmp = tempfile.mkdtemp(dir=u'.', prefix='test-transport-')
311
313
        dtmp_base = os.path.basename(dtmp)
312
314
        local_t = LocalTransport(dtmp)
313
315
 
375
377
                'some\nmore\nfor\nb\n'
376
378
                'from an iterator\n')
377
379
 
 
380
        if self.readonly:
 
381
            _append('c', 'some text\nfor a missing file\n')
 
382
            _append('a', 'some text in a\n')
 
383
            _append('d', 'missing file r\n')
 
384
        else:
 
385
            t.append('c', 'some text\nfor a missing file\n')
 
386
            t.append_multi([('a', 'some text in a\n'),
 
387
                            ('d', 'missing file r\n')])
 
388
        self.check_file_contents('a', 
 
389
            'diff\ncontents for\na\n'
 
390
            'add\nsome\nmore\ncontents\n'
 
391
            'and\nthen\nsome\nmore\n'
 
392
            'a little bit more\n'
 
393
            'some text in a\n')
 
394
        self.check_file_contents('c', 'some text\nfor a missing file\n')
 
395
        self.check_file_contents('d', 'missing file r\n')
 
396
 
378
397
    def test_append_file(self):
379
398
        t = self.get_transport()
380
399
 
456
475
                'some text for the\nthird file created\n'
457
476
                'some garbage\nto put in three\n')
458
477
 
 
478
        a5 = open('f2', 'rb')
 
479
        a6 = open('f2', 'rb')
 
480
        a7 = open('f3', 'rb')
 
481
        if self.readonly:
 
482
            _append('c', a5.read())
 
483
            _append('a', a6.read())
 
484
            _append('d', a7.read())
 
485
        else:
 
486
            t.append('c', a5)
 
487
            t.append_multi([('a', a6), ('d', a7)])
 
488
        del a5, a6, a7
 
489
        self.check_file_contents('c', open('f2', 'rb').read())
 
490
        self.check_file_contents('d', open('f3', 'rb').read())
 
491
 
 
492
 
459
493
    def test_delete(self):
460
494
        # TODO: Test Transport.delete
461
 
        pass
 
495
        t = self.get_transport()
 
496
 
 
497
        # Not much to do with a readonly transport
 
498
        if self.readonly:
 
499
            return
 
500
 
 
501
        open('a', 'wb').write('a little bit of text\n')
 
502
        self.failUnless(t.has('a'))
 
503
        self.failUnlessExists('a')
 
504
        t.delete('a')
 
505
        self.failIf(os.path.lexists('a'))
 
506
 
 
507
        self.assertRaises(NoSuchFile, t.delete, 'a')
 
508
 
 
509
        open('a', 'wb').write('a text\n')
 
510
        open('b', 'wb').write('b text\n')
 
511
        open('c', 'wb').write('c text\n')
 
512
        self.assertEqual([True, True, True],
 
513
                list(t.has_multi(['a', 'b', 'c'])))
 
514
        t.delete_multi(['a', 'c'])
 
515
        self.assertEqual([False, True, False],
 
516
                list(t.has_multi(['a', 'b', 'c'])))
 
517
        self.failIf(os.path.lexists('a'))
 
518
        self.failUnlessExists('b')
 
519
        self.failIf(os.path.lexists('c'))
 
520
 
 
521
        self.assertRaises(NoSuchFile,
 
522
                t.delete_multi, ['a', 'b', 'c'])
 
523
 
 
524
        self.assertRaises(NoSuchFile,
 
525
                t.delete_multi, iter(['a', 'b', 'c']))
 
526
 
 
527
        open('a', 'wb').write('another a text\n')
 
528
        open('c', 'wb').write('another c text\n')
 
529
        t.delete_multi(iter(['a', 'b', 'c']))
 
530
 
 
531
        # We should have deleted everything
 
532
        # SftpServer creates control files in the
 
533
        # working directory, so we can just do a
 
534
        # plain "listdir".
 
535
        # self.assertEqual([], os.listdir('.'))
462
536
 
463
537
    def test_move(self):
464
 
        # TODO: Test Transport.move
465
 
        pass
 
538
        t = self.get_transport()
 
539
 
 
540
        if self.readonly:
 
541
            return
 
542
 
 
543
        # TODO: I would like to use os.listdir() to
 
544
        # make sure there are no extra files, but SftpServer
 
545
        # creates control files in the working directory
 
546
        # perhaps all of this could be done in a subdirectory
 
547
 
 
548
        open('a', 'wb').write('a first file\n')
 
549
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
550
 
 
551
        t.move('a', 'b')
 
552
        self.failUnlessExists('b')
 
553
        self.failIf(os.path.lexists('a'))
 
554
 
 
555
        self.check_file_contents('b', 'a first file\n')
 
556
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
557
 
 
558
        # Overwrite a file
 
559
        open('c', 'wb').write('c this file\n')
 
560
        t.move('c', 'b')
 
561
        self.failIf(os.path.lexists('c'))
 
562
        self.check_file_contents('b', 'c this file\n')
 
563
 
 
564
        # TODO: Try to write a test for atomicity
 
565
        # TODO: Test moving into a non-existant subdirectory
 
566
        # TODO: Test Transport.move_multi
 
567
 
 
568
    def test_copy(self):
 
569
        t = self.get_transport()
 
570
 
 
571
        if self.readonly:
 
572
            return
 
573
 
 
574
        open('a', 'wb').write('a file\n')
 
575
        t.copy('a', 'b')
 
576
        self.check_file_contents('b', 'a file\n')
 
577
 
 
578
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
 
579
        os.mkdir('c')
 
580
        # What should the assert be if you try to copy a
 
581
        # file over a directory?
 
582
        #self.assertRaises(Something, t.copy, 'a', 'c')
 
583
        open('d', 'wb').write('text in d\n')
 
584
        t.copy('d', 'b')
 
585
        self.check_file_contents('b', 'text in d\n')
 
586
 
 
587
        # TODO: test copy_multi
466
588
 
467
589
    def test_connection_error(self):
468
590
        """ConnectionError is raised when connection is impossible"""
478
600
        else:
479
601
            self.failIf(True, 'Did not get the expected exception.')
480
602
 
 
603
    def test_stat(self):
 
604
        # TODO: Test stat, just try once, and if it throws, stop testing
 
605
        from stat import S_ISDIR, S_ISREG
 
606
 
 
607
        t = self.get_transport()
 
608
 
 
609
        try:
 
610
            st = t.stat('.')
 
611
        except TransportNotPossible, e:
 
612
            # This transport cannot stat
 
613
            return
 
614
 
 
615
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
 
616
        self.build_tree(paths)
 
617
 
 
618
        local_stats = []
 
619
 
 
620
        for p in paths:
 
621
            st = t.stat(p)
 
622
            local_st = os.stat(p)
 
623
            if p.endswith('/'):
 
624
                self.failUnless(S_ISDIR(st.st_mode))
 
625
            else:
 
626
                self.failUnless(S_ISREG(st.st_mode))
 
627
            self.assertEqual(local_st.st_size, st.st_size)
 
628
            self.assertEqual(local_st.st_mode, st.st_mode)
 
629
            local_stats.append(local_st)
 
630
 
 
631
        remote_stats = list(t.stat_multi(paths))
 
632
        remote_iter_stats = list(t.stat_multi(iter(paths)))
 
633
 
 
634
        for local, remote, remote_iter in \
 
635
            zip(local_stats, remote_stats, remote_iter_stats):
 
636
            self.assertEqual(local.st_mode, remote.st_mode)
 
637
            self.assertEqual(local.st_mode, remote_iter.st_mode)
 
638
 
 
639
            self.assertEqual(local.st_size, remote.st_size)
 
640
            self.assertEqual(local.st_size, remote_iter.st_size)
 
641
            # Should we test UID/GID?
 
642
 
 
643
        self.assertRaises(NoSuchFile, t.stat, 'q')
 
644
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
 
645
 
 
646
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
 
647
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
 
648
 
 
649
    def test_list_dir(self):
 
650
        # TODO: Test list_dir, just try once, and if it throws, stop testing
 
651
        t = self.get_transport()
 
652
        
 
653
        if not t.listable():
 
654
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
 
655
            return
 
656
 
 
657
        def sorted_list(d):
 
658
            l = list(t.list_dir(d))
 
659
            l.sort()
 
660
            return l
 
661
 
 
662
        # SftpServer creates control files in the working directory
 
663
        # so lets move down a directory to be safe
 
664
        os.mkdir('wd')
 
665
        os.chdir('wd')
 
666
        t = t.clone('wd')
 
667
 
 
668
        self.assertEqual([], sorted_list(u'.'))
 
669
        self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e'])
 
670
 
 
671
        self.assertEqual([u'a', u'b', u'c'], sorted_list(u'.'))
 
672
        self.assertEqual([u'd', u'e'], sorted_list(u'c'))
 
673
 
 
674
        os.remove('c/d')
 
675
        os.remove('b')
 
676
        self.assertEqual([u'a', u'c'], sorted_list('.'))
 
677
        self.assertEqual([u'e'], sorted_list(u'c'))
 
678
 
 
679
        self.assertListRaises(NoSuchFile, t.list_dir, 'q')
 
680
        self.assertListRaises(NoSuchFile, t.list_dir, 'c/f')
 
681
        self.assertListRaises(NoSuchFile, t.list_dir, 'a')
 
682
 
 
683
    def test_clone(self):
 
684
        # TODO: Test that clone moves up and down the filesystem
 
685
        t1 = self.get_transport()
 
686
 
 
687
        self.build_tree(['a', 'b/', 'b/c'])
 
688
 
 
689
        self.failUnless(t1.has('a'))
 
690
        self.failUnless(t1.has('b/c'))
 
691
        self.failIf(t1.has('c'))
 
692
 
 
693
        t2 = t1.clone('b')
 
694
        self.failUnless(t2.has('c'))
 
695
        self.failIf(t2.has('a'))
 
696
 
 
697
        t3 = t2.clone('..')
 
698
        self.failUnless(t3.has('a'))
 
699
        self.failIf(t3.has('c'))
 
700
 
 
701
        self.failIf(t1.has('b/d'))
 
702
        self.failIf(t2.has('d'))
 
703
        self.failIf(t3.has('b/d'))
 
704
 
 
705
        if self.readonly:
 
706
            open('b/d', 'wb').write('newfile\n')
 
707
        else:
 
708
            t2.put('d', 'newfile\n')
 
709
 
 
710
        self.failUnless(t1.has('b/d'))
 
711
        self.failUnless(t2.has('d'))
 
712
        self.failUnless(t3.has('b/d'))
 
713
 
481
714
        
482
715
class LocalTransportTest(TestCaseInTempDir, TestTransportMixIn):
483
716
    def get_transport(self):
484
717
        from bzrlib.transport.local import LocalTransport
485
 
        return LocalTransport('.')
 
718
        return LocalTransport(u'.')
486
719
 
487
720
 
488
721
class HttpTransportTest(TestCaseWithWebserver, TestTransportMixIn):
491
724
 
492
725
    def get_transport(self):
493
726
        from bzrlib.transport.http import HttpTransport
494
 
        url = self.get_remote_url('.')
 
727
        url = self.get_remote_url(u'.')
495
728
        return HttpTransport(url)
496
729
 
497
730
    def get_bogus_transport(self):