~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport_implementations.py

  • Committer: Andrew Bennetts
  • Date: 2007-08-30 08:11:54 UTC
  • mfrom: (2766 +trunk)
  • mto: (2535.3.55 repo-refactor)
  • mto: This revision was merged to the branch mainline in revision 2772.
  • Revision ID: andrew.bennetts@canonical.com-20070830081154-16hebp2xwr15x2hc
Merge from bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
45
45
                           )
46
46
from bzrlib.osutils import getcwd
47
47
from bzrlib.smart import medium
48
 
from bzrlib.symbol_versioning import zero_eleven
49
48
from bzrlib.tests import TestCaseInTempDir, TestScenarioApplier, TestSkipped
50
49
from bzrlib.tests.test_transport import TestTransportImplementation
51
50
from bzrlib.transport import (
230
229
 
231
230
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
232
231
 
233
 
    def test_put(self):
234
 
        t = self.get_transport()
235
 
 
236
 
        if t.is_readonly():
237
 
            return
238
 
 
239
 
        self.applyDeprecated(zero_eleven, t.put, 'a', 'string\ncontents\n')
240
 
        self.check_transport_contents('string\ncontents\n', t, 'a')
241
 
 
242
 
        self.applyDeprecated(zero_eleven,
243
 
                             t.put, 'b', StringIO('file-like\ncontents\n'))
244
 
        self.check_transport_contents('file-like\ncontents\n', t, 'b')
245
 
 
246
 
        self.assertRaises(NoSuchFile,
247
 
            self.applyDeprecated,
248
 
            zero_eleven,
249
 
            t.put, 'path/doesnt/exist/c', StringIO('contents'))
 
232
    def test_get_with_open_write_stream_sees_all_content(self):
 
233
        t = self.get_transport()
 
234
        if t.is_readonly():
 
235
            return
 
236
        handle = t.open_write_stream('foo')
 
237
        try:
 
238
            handle.write('b')
 
239
            self.assertEqual('b', t.get('foo').read())
 
240
        finally:
 
241
            handle.close()
 
242
 
 
243
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
 
244
        t = self.get_transport()
 
245
        if t.is_readonly():
 
246
            return
 
247
        handle = t.open_write_stream('foo')
 
248
        try:
 
249
            handle.write('b')
 
250
            self.assertEqual('b', t.get_bytes('foo'))
 
251
            self.assertEqual('b', t.get('foo').read())
 
252
        finally:
 
253
            handle.close()
250
254
 
251
255
    def test_put_bytes(self):
252
256
        t = self.get_transport()
436
440
        # Yes, you can put a file such that it becomes readonly
437
441
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
438
442
        self.assertTransportMode(t, 'mode400', 0400)
439
 
 
440
 
        # XXX: put_multi is deprecated, so do we really care anymore?
441
 
        self.applyDeprecated(zero_eleven, t.put_multi,
442
 
                             [('mmode644', StringIO('text\n'))], mode=0644)
443
 
        self.assertTransportMode(t, 'mmode644', 0644)
444
 
 
445
443
        # The default permissions should be based on the current umask
446
444
        umask = osutils.get_umask()
447
445
        t.put_file('nomode', StringIO('test text\n'), mode=None)
511
509
        unicode_file = pyStringIO(u'\u1234')
512
510
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
513
511
 
514
 
    def test_put_multi(self):
515
 
        t = self.get_transport()
516
 
 
517
 
        if t.is_readonly():
518
 
            return
519
 
        self.assertEqual(2, self.applyDeprecated(zero_eleven,
520
 
            t.put_multi, [('a', StringIO('new\ncontents for\na\n')),
521
 
                          ('d', StringIO('contents\nfor d\n'))]
522
 
            ))
523
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd'])),
524
 
                [True, False, False, True])
525
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
526
 
        self.check_transport_contents('contents\nfor d\n', t, 'd')
527
 
 
528
 
        self.assertEqual(2, self.applyDeprecated(zero_eleven,
529
 
            t.put_multi, iter([('a', StringIO('diff\ncontents for\na\n')),
530
 
                              ('d', StringIO('another contents\nfor d\n'))])
531
 
            ))
532
 
        self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
533
 
        self.check_transport_contents('another contents\nfor d\n', t, 'd')
534
 
 
535
 
    def test_put_permissions(self):
536
 
        t = self.get_transport()
537
 
 
538
 
        if t.is_readonly():
539
 
            return
540
 
        if not t._can_roundtrip_unix_modebits():
541
 
            # Can't roundtrip, so no need to run this test
542
 
            return
543
 
        self.applyDeprecated(zero_eleven, t.put, 'mode644',
544
 
                             StringIO('test text\n'), mode=0644)
545
 
        self.assertTransportMode(t, 'mode644', 0644)
546
 
        self.applyDeprecated(zero_eleven, t.put, 'mode666',
547
 
                             StringIO('test text\n'), mode=0666)
548
 
        self.assertTransportMode(t, 'mode666', 0666)
549
 
        self.applyDeprecated(zero_eleven, t.put, 'mode600',
550
 
                             StringIO('test text\n'), mode=0600)
551
 
        self.assertTransportMode(t, 'mode600', 0600)
552
 
        # Yes, you can put a file such that it becomes readonly
553
 
        self.applyDeprecated(zero_eleven, t.put, 'mode400',
554
 
                             StringIO('test text\n'), mode=0400)
555
 
        self.assertTransportMode(t, 'mode400', 0400)
556
 
        self.applyDeprecated(zero_eleven, t.put_multi,
557
 
                             [('mmode644', StringIO('text\n'))], mode=0644)
558
 
        self.assertTransportMode(t, 'mmode644', 0644)
559
 
 
560
 
        # The default permissions should be based on the current umask
561
 
        umask = osutils.get_umask()
562
 
        self.applyDeprecated(zero_eleven, t.put, 'nomode',
563
 
                             StringIO('test text\n'), mode=None)
564
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
565
 
        
566
512
    def test_mkdir(self):
567
513
        t = self.get_transport()
568
514
 
633
579
        t.mkdir('dnomode', mode=None)
634
580
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
635
581
 
 
582
    def test_opening_a_file_stream_creates_file(self):
 
583
        t = self.get_transport()
 
584
        if t.is_readonly():
 
585
            return
 
586
        handle = t.open_write_stream('foo')
 
587
        try:
 
588
            self.assertEqual('', t.get_bytes('foo'))
 
589
        finally:
 
590
            handle.close()
 
591
 
 
592
    def test_opening_a_file_stream_can_set_mode(self):
 
593
        t = self.get_transport()
 
594
        if t.is_readonly():
 
595
            return
 
596
        if not t._can_roundtrip_unix_modebits():
 
597
            # Can't roundtrip, so no need to run this test
 
598
            return
 
599
        def check_mode(name, mode, expected):
 
600
            handle = t.open_write_stream(name, mode=mode)
 
601
            handle.close()
 
602
            self.assertTransportMode(t, name, expected)
 
603
        check_mode('mode644', 0644, 0644)
 
604
        check_mode('mode666', 0666, 0666)
 
605
        check_mode('mode600', 0600, 0600)
 
606
        # The default permissions should be based on the current umask
 
607
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
608
 
636
609
    def test_copy_to(self):
637
610
        # FIXME: test:   same server to same server (partly done)
638
611
        # same protocol two servers
683
656
            for f in files:
684
657
                self.assertTransportMode(temp_transport, f, mode)
685
658
 
686
 
    def test_append(self):
687
 
        t = self.get_transport()
688
 
 
689
 
        if t.is_readonly():
690
 
            return
691
 
        t.put_bytes('a', 'diff\ncontents for\na\n')
692
 
        t.put_bytes('b', 'contents\nfor b\n')
693
 
 
694
 
        self.assertEqual(20, self.applyDeprecated(zero_eleven,
695
 
            t.append, 'a', StringIO('add\nsome\nmore\ncontents\n')))
696
 
 
697
 
        self.check_transport_contents(
698
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
699
 
            t, 'a')
700
 
 
701
 
        # And we can create new files, too
702
 
        self.assertEqual(0, self.applyDeprecated(zero_eleven,
703
 
            t.append, 'c', StringIO('some text\nfor a missing file\n')))
704
 
        self.check_transport_contents('some text\nfor a missing file\n',
705
 
                                      t, 'c')
706
659
    def test_append_file(self):
707
660
        t = self.get_transport()
708
661
 
866
819
        # plain "listdir".
867
820
        # self.assertEqual([], os.listdir('.'))
868
821
 
 
822
    def test_recommended_page_size(self):
 
823
        """Transports recommend a page size for partial access to files."""
 
824
        t = self.get_transport()
 
825
        self.assertIsInstance(t.recommended_page_size(), int)
 
826
 
869
827
    def test_rmdir(self):
870
828
        t = self.get_transport()
871
829
        # Not much to do with a readonly transport
1527
1485
        self.assertEqual(d[2], (0, '0'))
1528
1486
        self.assertEqual(d[3], (3, '34'))
1529
1487
 
 
1488
    def test_get_with_open_write_stream_sees_all_content(self):
 
1489
        t = self.get_transport()
 
1490
        if t.is_readonly():
 
1491
            return
 
1492
        handle = t.open_write_stream('foo')
 
1493
        try:
 
1494
            handle.write('bcd')
 
1495
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
 
1496
        finally:
 
1497
            handle.close()
 
1498
 
1530
1499
    def test_get_smart_medium(self):
1531
1500
        """All transports must either give a smart medium, or know they can't.
1532
1501
        """