~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Michael Ellerman
  • Date: 2005-12-10 22:11:13 UTC
  • mto: This revision was merged to the branch mainline in revision 1528.
  • Revision ID: michael@ellerman.id.au-20051210221113-99ca561aaab4661e
Simplify handling of DivergedBranches in cmd_pull()

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
import os
19
19
from cStringIO import StringIO
20
20
 
21
 
from bzrlib.errors import NoSuchFile, FileExists, TransportNotPossible
22
 
from bzrlib.selftest import TestCase, TestCaseInTempDir
23
 
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
24
 
from bzrlib.transport import memory
 
21
from bzrlib.errors import (NoSuchFile, FileExists,
 
22
                           TransportNotPossible, ConnectionError)
 
23
from bzrlib.tests import TestCase, TestCaseInTempDir
 
24
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
 
25
from bzrlib.transport import memory, urlescape
25
26
 
26
27
 
27
28
def _append(fn, txt):
32
33
    f.close()
33
34
    del f
34
35
 
 
36
class TestTransport(TestCase):
 
37
    """Test the non transport-concrete class functionality."""
 
38
 
 
39
    def test_urlescape(self):
 
40
        self.assertEqual('%25', urlescape('%'))
 
41
 
 
42
 
35
43
class TestTransportMixIn(object):
36
44
    """Subclass this, and it will provide a series of tests for a Transport.
37
45
    It assumes that the Transport object is connected to the 
48
56
        """
49
57
        raise NotImplementedError
50
58
 
 
59
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
60
        """Many transport functions can return generators this makes sure
 
61
        to wrap them in a list() call to make sure the whole generator
 
62
        is run, and that the proper exception is raised.
 
63
        """
 
64
        try:
 
65
            list(func(*args, **kwargs))
 
66
        except excClass:
 
67
            return
 
68
        else:
 
69
            if hasattr(excClass,'__name__'): excName = excClass.__name__
 
70
            else: excName = str(excClass)
 
71
            raise self.failureException, "%s not raised" % excName
 
72
 
51
73
    def test_has(self):
52
74
        t = self.get_transport()
53
75
 
54
 
        files = ['a', 'b', 'e', 'g']
 
76
        files = ['a', 'b', 'e', 'g', '%']
55
77
        self.build_tree(files)
56
 
        self.assertEqual(t.has('a'), True)
57
 
        self.assertEqual(t.has('c'), False)
 
78
        self.assertEqual(True, t.has('a'))
 
79
        self.assertEqual(False, t.has('c'))
 
80
        self.assertEqual(True, t.has(urlescape('%')))
58
81
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
59
82
                [True, True, False, False, True, False, True, False])
 
83
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
 
84
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlescape('%%')]))
60
85
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
61
86
                [True, True, False, False, True, False, True, False])
 
87
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
 
88
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
62
89
 
63
90
    def test_get(self):
64
91
        t = self.get_transport()
65
92
 
66
93
        files = ['a', 'b', 'e', 'g']
67
94
        self.build_tree(files)
68
 
        self.assertEqual(t.get('a').read(), open('a').read())
 
95
        self.assertEqual(open('a', 'rb').read(), t.get('a').read())
69
96
        content_f = t.get_multi(files)
70
97
        for path,f in zip(files, content_f):
71
98
            self.assertEqual(open(path).read(), f.read())
72
99
 
73
100
        content_f = t.get_multi(iter(files))
74
101
        for path,f in zip(files, content_f):
75
 
            self.assertEqual(open(path).read(), f.read())
 
102
            self.assertEqual(f.read(), open(path).read())
76
103
 
77
104
        self.assertRaises(NoSuchFile, t.get, 'c')
78
 
        try:
79
 
            files = list(t.get_multi(['a', 'b', 'c']))
80
 
        except NoSuchFile:
81
 
            pass
82
 
        else:
83
 
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
84
 
        try:
85
 
            files = list(t.get_multi(iter(['a', 'b', 'c', 'e'])))
86
 
        except NoSuchFile:
87
 
            pass
88
 
        else:
89
 
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
 
105
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
 
106
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
90
107
 
91
108
    def test_put(self):
92
109
        t = self.get_transport()
195
212
        self.check_file_contents('f5', 'here is some text\nand a bit more\n')
196
213
        self.check_file_contents('f6', 'some text for the\nthird file created\n')
197
214
 
198
 
 
199
 
 
200
215
    def test_mkdir(self):
201
216
        t = self.get_transport()
202
217
 
274
289
        files = ['a', 'b', 'c', 'd']
275
290
        self.build_tree(files)
276
291
 
277
 
        dtmp = tempfile.mkdtemp(dir='.', prefix='test-transport-')
 
292
        dtmp = tempfile.mkdtemp(dir=u'.', prefix='test-transport-')
278
293
        dtmp_base = os.path.basename(dtmp)
279
294
        local_t = LocalTransport(dtmp)
280
295
 
283
298
            self.assertEquals(open(f).read(),
284
299
                    open(os.path.join(dtmp_base, f)).read())
285
300
 
 
301
        # Test that copying into a missing directory raises
 
302
        # NoSuchFile
 
303
        os.mkdir('e')
 
304
        open('e/f', 'wb').write('contents of e')
 
305
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], local_t)
 
306
 
 
307
        os.mkdir(os.path.join(dtmp_base, 'e'))
 
308
        t.copy_to(['e/f'], local_t)
 
309
 
286
310
        del dtmp, dtmp_base, local_t
287
311
 
288
 
        dtmp = tempfile.mkdtemp(dir='.', prefix='test-transport-')
 
312
        dtmp = tempfile.mkdtemp(dir=u'.', prefix='test-transport-')
289
313
        dtmp_base = os.path.basename(dtmp)
290
314
        local_t = LocalTransport(dtmp)
291
315
 
353
377
                'some\nmore\nfor\nb\n'
354
378
                'from an iterator\n')
355
379
 
 
380
        if self.readonly:
 
381
            _append('c', 'some text\nfor a missing file\n')
 
382
            _append('a', 'some text in a\n')
 
383
            _append('d', 'missing file r\n')
 
384
        else:
 
385
            t.append('c', 'some text\nfor a missing file\n')
 
386
            t.append_multi([('a', 'some text in a\n'),
 
387
                            ('d', 'missing file r\n')])
 
388
        self.check_file_contents('a', 
 
389
            'diff\ncontents for\na\n'
 
390
            'add\nsome\nmore\ncontents\n'
 
391
            'and\nthen\nsome\nmore\n'
 
392
            'a little bit more\n'
 
393
            'some text in a\n')
 
394
        self.check_file_contents('c', 'some text\nfor a missing file\n')
 
395
        self.check_file_contents('d', 'missing file r\n')
 
396
 
356
397
    def test_append_file(self):
357
398
        t = self.get_transport()
358
399
 
434
475
                'some text for the\nthird file created\n'
435
476
                'some garbage\nto put in three\n')
436
477
 
 
478
        a5 = open('f2', 'rb')
 
479
        a6 = open('f2', 'rb')
 
480
        a7 = open('f3', 'rb')
 
481
        if self.readonly:
 
482
            _append('c', a5.read())
 
483
            _append('a', a6.read())
 
484
            _append('d', a7.read())
 
485
        else:
 
486
            t.append('c', a5)
 
487
            t.append_multi([('a', a6), ('d', a7)])
 
488
        del a5, a6, a7
 
489
        self.check_file_contents('c', open('f2', 'rb').read())
 
490
        self.check_file_contents('d', open('f3', 'rb').read())
 
491
 
 
492
 
437
493
    def test_delete(self):
438
494
        # TODO: Test Transport.delete
439
 
        pass
 
495
        t = self.get_transport()
 
496
 
 
497
        # Not much to do with a readonly transport
 
498
        if self.readonly:
 
499
            return
 
500
 
 
501
        open('a', 'wb').write('a little bit of text\n')
 
502
        self.failUnless(t.has('a'))
 
503
        self.failUnlessExists('a')
 
504
        t.delete('a')
 
505
        self.failIf(os.path.lexists('a'))
 
506
 
 
507
        self.assertRaises(NoSuchFile, t.delete, 'a')
 
508
 
 
509
        open('a', 'wb').write('a text\n')
 
510
        open('b', 'wb').write('b text\n')
 
511
        open('c', 'wb').write('c text\n')
 
512
        self.assertEqual([True, True, True],
 
513
                list(t.has_multi(['a', 'b', 'c'])))
 
514
        t.delete_multi(['a', 'c'])
 
515
        self.assertEqual([False, True, False],
 
516
                list(t.has_multi(['a', 'b', 'c'])))
 
517
        self.failIf(os.path.lexists('a'))
 
518
        self.failUnlessExists('b')
 
519
        self.failIf(os.path.lexists('c'))
 
520
 
 
521
        self.assertRaises(NoSuchFile,
 
522
                t.delete_multi, ['a', 'b', 'c'])
 
523
 
 
524
        self.assertRaises(NoSuchFile,
 
525
                t.delete_multi, iter(['a', 'b', 'c']))
 
526
 
 
527
        open('a', 'wb').write('another a text\n')
 
528
        open('c', 'wb').write('another c text\n')
 
529
        t.delete_multi(iter(['a', 'b', 'c']))
 
530
 
 
531
        # We should have deleted everything
 
532
        # SftpServer creates control files in the
 
533
        # working directory, so we can just do a
 
534
        # plain "listdir".
 
535
        # self.assertEqual([], os.listdir('.'))
440
536
 
441
537
    def test_move(self):
442
 
        # TODO: Test Transport.move
443
 
        pass
444
 
 
445
 
 
 
538
        t = self.get_transport()
 
539
 
 
540
        if self.readonly:
 
541
            return
 
542
 
 
543
        # TODO: I would like to use os.listdir() to
 
544
        # make sure there are no extra files, but SftpServer
 
545
        # creates control files in the working directory
 
546
        # perhaps all of this could be done in a subdirectory
 
547
 
 
548
        open('a', 'wb').write('a first file\n')
 
549
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
550
 
 
551
        t.move('a', 'b')
 
552
        self.failUnlessExists('b')
 
553
        self.failIf(os.path.lexists('a'))
 
554
 
 
555
        self.check_file_contents('b', 'a first file\n')
 
556
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
557
 
 
558
        # Overwrite a file
 
559
        open('c', 'wb').write('c this file\n')
 
560
        t.move('c', 'b')
 
561
        self.failIf(os.path.lexists('c'))
 
562
        self.check_file_contents('b', 'c this file\n')
 
563
 
 
564
        # TODO: Try to write a test for atomicity
 
565
        # TODO: Test moving into a non-existant subdirectory
 
566
        # TODO: Test Transport.move_multi
 
567
 
 
568
    def test_copy(self):
 
569
        t = self.get_transport()
 
570
 
 
571
        if self.readonly:
 
572
            return
 
573
 
 
574
        open('a', 'wb').write('a file\n')
 
575
        t.copy('a', 'b')
 
576
        self.check_file_contents('b', 'a file\n')
 
577
 
 
578
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
 
579
        os.mkdir('c')
 
580
        # What should the assert be if you try to copy a
 
581
        # file over a directory?
 
582
        #self.assertRaises(Something, t.copy, 'a', 'c')
 
583
        open('d', 'wb').write('text in d\n')
 
584
        t.copy('d', 'b')
 
585
        self.check_file_contents('b', 'text in d\n')
 
586
 
 
587
        # TODO: test copy_multi
 
588
 
 
589
    def test_connection_error(self):
 
590
        """ConnectionError is raised when connection is impossible"""
 
591
        if not hasattr(self, "get_bogus_transport"):
 
592
            return
 
593
        t = self.get_bogus_transport()
 
594
        try:
 
595
            t.get('.bzr/branch')
 
596
        except (ConnectionError, NoSuchFile), e:
 
597
            pass
 
598
        except (Exception), e:
 
599
            self.failIf(True, 'Wrong exception thrown: %s' % e)
 
600
        else:
 
601
            self.failIf(True, 'Did not get the expected exception.')
 
602
 
 
603
    def test_stat(self):
 
604
        # TODO: Test stat, just try once, and if it throws, stop testing
 
605
        from stat import S_ISDIR, S_ISREG
 
606
 
 
607
        t = self.get_transport()
 
608
 
 
609
        try:
 
610
            st = t.stat('.')
 
611
        except TransportNotPossible, e:
 
612
            # This transport cannot stat
 
613
            return
 
614
 
 
615
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
 
616
        self.build_tree(paths)
 
617
 
 
618
        local_stats = []
 
619
 
 
620
        for p in paths:
 
621
            st = t.stat(p)
 
622
            local_st = os.stat(p)
 
623
            if p.endswith('/'):
 
624
                self.failUnless(S_ISDIR(st.st_mode))
 
625
            else:
 
626
                self.failUnless(S_ISREG(st.st_mode))
 
627
            self.assertEqual(local_st.st_size, st.st_size)
 
628
            self.assertEqual(local_st.st_mode, st.st_mode)
 
629
            local_stats.append(local_st)
 
630
 
 
631
        remote_stats = list(t.stat_multi(paths))
 
632
        remote_iter_stats = list(t.stat_multi(iter(paths)))
 
633
 
 
634
        for local, remote, remote_iter in \
 
635
            zip(local_stats, remote_stats, remote_iter_stats):
 
636
            self.assertEqual(local.st_mode, remote.st_mode)
 
637
            self.assertEqual(local.st_mode, remote_iter.st_mode)
 
638
 
 
639
            self.assertEqual(local.st_size, remote.st_size)
 
640
            self.assertEqual(local.st_size, remote_iter.st_size)
 
641
            # Should we test UID/GID?
 
642
 
 
643
        self.assertRaises(NoSuchFile, t.stat, 'q')
 
644
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
 
645
 
 
646
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
 
647
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
 
648
 
 
649
    def test_list_dir(self):
 
650
        # TODO: Test list_dir, just try once, and if it throws, stop testing
 
651
        t = self.get_transport()
 
652
        
 
653
        if not t.listable():
 
654
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
 
655
            return
 
656
 
 
657
        def sorted_list(d):
 
658
            l = list(t.list_dir(d))
 
659
            l.sort()
 
660
            return l
 
661
 
 
662
        # SftpServer creates control files in the working directory
 
663
        # so lets move down a directory to be safe
 
664
        os.mkdir('wd')
 
665
        os.chdir('wd')
 
666
        t = t.clone('wd')
 
667
 
 
668
        self.assertEqual([], sorted_list(u'.'))
 
669
        self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e'])
 
670
 
 
671
        self.assertEqual([u'a', u'b', u'c'], sorted_list(u'.'))
 
672
        self.assertEqual([u'd', u'e'], sorted_list(u'c'))
 
673
 
 
674
        os.remove('c/d')
 
675
        os.remove('b')
 
676
        self.assertEqual([u'a', u'c'], sorted_list('.'))
 
677
        self.assertEqual([u'e'], sorted_list(u'c'))
 
678
 
 
679
        self.assertListRaises(NoSuchFile, t.list_dir, 'q')
 
680
        self.assertListRaises(NoSuchFile, t.list_dir, 'c/f')
 
681
        self.assertListRaises(NoSuchFile, t.list_dir, 'a')
 
682
 
 
683
    def test_clone(self):
 
684
        # TODO: Test that clone moves up and down the filesystem
 
685
        t1 = self.get_transport()
 
686
 
 
687
        self.build_tree(['a', 'b/', 'b/c'])
 
688
 
 
689
        self.failUnless(t1.has('a'))
 
690
        self.failUnless(t1.has('b/c'))
 
691
        self.failIf(t1.has('c'))
 
692
 
 
693
        t2 = t1.clone('b')
 
694
        self.failUnless(t2.has('c'))
 
695
        self.failIf(t2.has('a'))
 
696
 
 
697
        t3 = t2.clone('..')
 
698
        self.failUnless(t3.has('a'))
 
699
        self.failIf(t3.has('c'))
 
700
 
 
701
        self.failIf(t1.has('b/d'))
 
702
        self.failIf(t2.has('d'))
 
703
        self.failIf(t3.has('b/d'))
 
704
 
 
705
        if self.readonly:
 
706
            open('b/d', 'wb').write('newfile\n')
 
707
        else:
 
708
            t2.put('d', 'newfile\n')
 
709
 
 
710
        self.failUnless(t1.has('b/d'))
 
711
        self.failUnless(t2.has('d'))
 
712
        self.failUnless(t3.has('b/d'))
 
713
 
 
714
        
446
715
class LocalTransportTest(TestCaseInTempDir, TestTransportMixIn):
447
716
    def get_transport(self):
448
717
        from bzrlib.transport.local import LocalTransport
449
 
        return LocalTransport('.')
 
718
        return LocalTransport(u'.')
450
719
 
451
720
 
452
721
class HttpTransportTest(TestCaseWithWebserver, TestTransportMixIn):
455
724
 
456
725
    def get_transport(self):
457
726
        from bzrlib.transport.http import HttpTransport
458
 
        url = self.get_remote_url('.')
 
727
        url = self.get_remote_url(u'.')
459
728
        return HttpTransport(url)
460
729
 
 
730
    def get_bogus_transport(self):
 
731
        from bzrlib.transport.http import HttpTransport
 
732
        return HttpTransport('http://jasldkjsalkdjalksjdkljasd')
 
733
 
461
734
 
462
735
class TestMemoryTransport(TestCase):
463
736
 
548
821
        transport.put('bar', StringIO('phowar'))
549
822
        self.assertEqual(7, transport.stat('foo').st_size)
550
823
        self.assertEqual(6, transport.stat('bar').st_size)
551
 
        
 
824