~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_weave.py

Add RepositoryFormats and allow bzrdir.open or create _repository to be used.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
 
20
20
# TODO: tests regarding version names
21
 
 
22
 
 
 
21
# TODO: rbc 20050108 test that join does not leave an inconsistent weave 
 
22
#       if it fails.
23
23
 
24
24
"""test suite for weave algorithm"""
25
25
 
26
26
from pprint import pformat
27
27
 
28
 
from bzrlib.weave import Weave, WeaveFormatError, WeaveError
 
28
import bzrlib.errors as errors
 
29
from bzrlib.weave import Weave, WeaveFormatError, WeaveError, reweave
29
30
from bzrlib.weavefile import write_weave, read_weave
30
 
from bzrlib.selftest import TestCase
 
31
from bzrlib.tests import TestCase
31
32
from bzrlib.osutils import sha_string
32
33
 
33
34
 
37
38
          "A second line"]
38
39
 
39
40
 
40
 
 
41
41
class TestBase(TestCase):
42
42
    def check_read_write(self, k):
43
43
        """Check the weave k can be written & re-read."""
58
58
            self.log('         %r' % k._parents)
59
59
            self.log('         %r' % k2._parents)
60
60
            self.log('')
61
 
 
62
 
            
63
61
            self.fail('read/write check failed')
64
 
        
65
 
        
 
62
 
 
63
 
 
64
class WeaveContains(TestBase):
 
65
    """Weave __contains__ operator"""
 
66
    def runTest(self):
 
67
        k = Weave()
 
68
        self.assertFalse('foo' in k)
 
69
        k.add('foo', [], TEXT_1)
 
70
        self.assertTrue('foo' in k)
66
71
 
67
72
 
68
73
class Easy(TestBase):
79
84
        self.assertEqual(idx, 0)
80
85
 
81
86
 
82
 
 
83
87
class AnnotateOne(TestBase):
84
88
    def runTest(self):
85
89
        k = Weave()
102
106
        self.assertEqual(k.get(1), TEXT_1)
103
107
 
104
108
 
105
 
 
106
109
class AddWithGivenSha(TestBase):
107
110
    def runTest(self):
108
111
        """Add with caller-supplied SHA-1"""
112
115
        k.add('text0', [], [t], sha1=sha_string(t))
113
116
 
114
117
 
 
118
class GetSha1(TestBase):
 
119
    def test_get_sha1(self):
 
120
        k = Weave()
 
121
        k.add('text0', [], 'text0')
 
122
        self.assertEqual('34dc0e430c642a26c3dd1c2beb7a8b4f4445eb79',
 
123
                         k.get_sha1('text0'))
 
124
        self.assertRaises(errors.WeaveRevisionNotPresent,
 
125
                          k.get_sha1, 0)
 
126
        self.assertRaises(errors.WeaveRevisionNotPresent,
 
127
                          k.get_sha1, 'text1')
 
128
                        
115
129
 
116
130
class InvalidAdd(TestBase):
117
131
    """Try to use invalid version number during add."""
134
148
        self.assertEqual(idx, idx2)
135
149
 
136
150
 
137
 
 
138
151
class InvalidRepeatedAdd(TestBase):
139
152
    def runTest(self):
140
153
        k = Weave()
151
164
                          TEXT_0)
152
165
        
153
166
 
154
 
 
155
167
class InsertLines(TestBase):
156
168
    """Store a revision that adds one line to the original.
157
169
 
208
220
                          (4, 'ccc')])
209
221
 
210
222
 
211
 
 
212
223
class DeleteLines(TestBase):
213
224
    """Deletion of lines from existing text.
214
225
 
238
249
        for i in range(len(texts)):
239
250
            self.assertEqual(k.get(i+1),
240
251
                             texts[i])
241
 
            
242
 
 
243
252
 
244
253
 
245
254
class SuicideDelete(TestBase):
265
274
                          0)        
266
275
 
267
276
 
268
 
 
269
277
class CannedDelete(TestBase):
270
278
    """Unpack canned weave with deleted lines."""
271
279
    def runTest(self):
282
290
                'last line',
283
291
                ('}', 0),
284
292
                ]
 
293
        k._sha1s = [sha_string('first lineline to be deletedlast line')
 
294
                  , sha_string('first linelast line')]
285
295
 
286
296
        self.assertEqual(k.get(0),
287
297
                         ['first line',
295
305
                          ])
296
306
 
297
307
 
298
 
 
299
308
class CannedReplacement(TestBase):
300
309
    """Unpack canned weave with deleted lines."""
301
310
    def runTest(self):
315
324
                'last line',
316
325
                ('}', 0),
317
326
                ]
 
327
        k._sha1s = [sha_string('first lineline to be deletedlast line')
 
328
                  , sha_string('first linereplacement linelast line')]
318
329
 
319
330
        self.assertEqual(k.get(0),
320
331
                         ['first line',
329
340
                          ])
330
341
 
331
342
 
332
 
 
333
343
class BadWeave(TestBase):
334
344
    """Test that we trap an insert which should not occur."""
335
345
    def runTest(self):
415
425
                '}',
416
426
                ('}', 0)]
417
427
 
 
428
        k._sha1s = [sha_string('foo {}')
 
429
                  , sha_string('foo {  added in version 1  also from v1}')
 
430
                  , sha_string('foo {  added in v2}')
 
431
                  , sha_string('foo {  added in version 1  added in v2  also from v1}')
 
432
                  ]
 
433
 
418
434
        self.assertEqual(k.get(0),
419
435
                         ['foo {',
420
436
                          '}'])
438
454
                          '}'])
439
455
                         
440
456
 
441
 
 
442
457
class DeleteLines2(TestBase):
443
458
    """Test recording revisions that delete lines.
444
459
 
466
481
                          (0, "fine")])
467
482
 
468
483
 
469
 
 
470
484
class IncludeVersions(TestBase):
471
485
    """Check texts that are stored across multiple revisions.
472
486
 
488
502
                "second line",
489
503
                ('}', 1)]
490
504
 
 
505
        k._sha1s = [sha_string('first line')
 
506
                  , sha_string('first linesecond line')]
 
507
 
491
508
        self.assertEqual(k.get(1),
492
509
                         ["first line",
493
510
                          "second line"])
517
534
                ('}', 2),                
518
535
                ]
519
536
 
 
537
        k._sha1s = [sha_string('first line')
 
538
                  , sha_string('first linesecond line')
 
539
                  , sha_string('first linealternative second line')]
 
540
 
520
541
        self.assertEqual(k.get(0),
521
542
                         ["first line"])
522
543
 
532
553
                         [0, 2])
533
554
 
534
555
 
535
 
 
536
556
class ReplaceLine(TestBase):
537
557
    def runTest(self):
538
558
        k = Weave()
549
569
        self.assertEqual(k.get(1), text1)
550
570
 
551
571
 
552
 
 
553
572
class Merge(TestBase):
554
573
    """Storage of versions that merge diverged parents"""
555
574
    def runTest(self):
606
625
                           [['bbb']]])
607
626
 
608
627
 
609
 
 
610
628
class NonConflict(TestBase):
611
629
    """Two descendants insert compatible changes.
612
630
 
619
637
        k.add([0], ['111', 'aaa', 'ccc', 'bbb'])
620
638
        k.add([1], ['aaa', 'ccc', 'bbb', '222'])
621
639
 
622
 
    
623
 
    
624
 
 
625
640
 
626
641
class AutoMerge(TestBase):
627
642
    def runTest(self):
645
660
                          'line from 1',
646
661
                          'bbb',
647
662
                          'line from 2', 'more from 2'])
648
 
        
649
663
 
650
664
 
651
665
class Khayyam(TestBase):
695
709
        self.check_read_write(k)
696
710
 
697
711
 
698
 
 
699
712
class MergeCases(TestBase):
700
713
    def doMerge(self, base, a, b, mp):
701
714
        from cStringIO import StringIO
752
765
        self.doMerge(['aaa', 'bbb'],
753
766
                     ['aaa', 'xxx', 'yyy', 'bbb'],
754
767
                     ['aaa', 'xxx', 'bbb'],
755
 
                     ['aaa', '<<<<', 'xxx', 'yyy', '====', 'xxx', '>>>>', 'bbb'])
 
768
                     ['aaa', '<<<<<<<', 'xxx', 'yyy', '=======', 'xxx', 
 
769
                      '>>>>>>>', 'bbb'])
756
770
 
757
771
        # really it ought to reduce this to 
758
772
        # ['aaa', 'xxx', 'yyy', 'bbb']
762
776
        self.doMerge(['aaa'],
763
777
                     ['xxx'],
764
778
                     ['yyy', 'zzz'],
765
 
                     ['<<<<', 'xxx', '====', 'yyy', 'zzz', '>>>>'])
 
779
                     ['<<<<<<<', 'xxx', '=======', 'yyy', 'zzz', 
 
780
                      '>>>>>>>'])
766
781
 
767
782
    def testNonClashInsert(self):
768
783
        self.doMerge(['aaa'],
769
784
                     ['xxx', 'aaa'],
770
785
                     ['yyy', 'zzz'],
771
 
                     ['<<<<', 'xxx', 'aaa', '====', 'yyy', 'zzz', '>>>>'])
 
786
                     ['<<<<<<<', 'xxx', 'aaa', '=======', 'yyy', 'zzz', 
 
787
                      '>>>>>>>'])
772
788
 
773
789
        self.doMerge(['aaa'],
774
790
                     ['aaa'],
790
806
        self.doMerge(['aaa', 'bbb', 'ccc'],
791
807
                     ['aaa', 'ddd', 'ccc'],
792
808
                     ['aaa', 'ccc'],
793
 
                     ['<<<<', 'aaa', '====', '>>>>', 'ccc'])
 
809
                     ['<<<<<<<<', 'aaa', '=======', '>>>>>>>', 'ccc'])
794
810
 
795
811
 
796
812
class JoinWeavesTests(TestBase):
873
889
        eq(sorted(wa.iter_names()), ['v1', 'v2', 'v3', 'x1',])
874
890
        eq(wa.get_text('x1'), 'line from x1\n')
875
891
 
876
 
    def test_join_with_ghosts(self):
877
 
        """Join that inserts parents of an existing revision.
878
 
 
879
 
        This can happen when merging from another branch who
880
 
        knows about revisions the destination does not.  In 
881
 
        this test the second weave knows of an additional parent of 
882
 
        v2.  Any revisions which are in common still have to have the 
883
 
        same text."""
884
 
        return ###############################
885
 
        wa = self.weave1.copy()
886
 
        wb = Weave()
887
 
        wb.add('x1', [], ['line from x1\n'])
888
 
        wb.add('v1', [], ['hello\n'])
889
 
        wb.add('v2', ['v1', 'x1'], ['hello\n', 'world\n'])
890
 
        wa.join(wb)
891
 
        eq = self.assertEquals
892
 
        eq(sorted(wa.iter_names()), ['v1', 'v2', 'v3', 'x1',])
893
 
        eq(wa.get_text('x1'), 'line from x1\n')
894
 
 
895
 
 
896
 
if __name__ == '__main__':
897
 
    import sys
898
 
    import unittest
899
 
    sys.exit(unittest.main())
900
 
    
 
892
 
 
893
class Corruption(TestCase):
 
894
 
 
895
    def test_detection(self):
 
896
        # Test weaves detect corruption.
 
897
        #
 
898
        # Weaves contain a checksum of their texts.
 
899
        # When a text is extracted, this checksum should be
 
900
        # verified.
 
901
 
 
902
        w = Weave()
 
903
        w.add('v1', [], ['hello\n'])
 
904
        w.add('v2', ['v1'], ['hello\n', 'there\n'])
 
905
 
 
906
        # We are going to invasively corrupt the text
 
907
        # Make sure the internals of weave are the same
 
908
        self.assertEqual([('{', 0)
 
909
                        , 'hello\n'
 
910
                        , ('}', None)
 
911
                        , ('{', 1)
 
912
                        , 'there\n'
 
913
                        , ('}', None)
 
914
                        ], w._weave)
 
915
 
 
916
        self.assertEqual(['f572d396fae9206628714fb2ce00f72e94f2258f'
 
917
                        , '90f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
918
                        ], w._sha1s)
 
919
        w.check()
 
920
 
 
921
        # Corrupted
 
922
        w._weave[4] = 'There\n'
 
923
 
 
924
        self.assertEqual('hello\n', w.get_text('v1'))
 
925
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
926
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
927
        self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
 
928
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
929
 
 
930
        # Corrected
 
931
        w._weave[4] = 'there\n'
 
932
        self.assertEqual('hello\nthere\n', w.get_text('v2'))
 
933
 
 
934
        #Invalid checksum, first digit changed
 
935
        w._sha1s[1] =  'f0f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
936
 
 
937
        self.assertEqual('hello\n', w.get_text('v1'))
 
938
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
939
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
940
        self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
 
941
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
942
 
 
943
    def test_written_detection(self):
 
944
        # Test detection of weave file corruption.
 
945
        #
 
946
        # Make sure that we can detect if a weave file has
 
947
        # been corrupted. This doesn't test all forms of corruption,
 
948
        # but it at least helps verify the data you get, is what you want.
 
949
        from cStringIO import StringIO
 
950
 
 
951
        w = Weave()
 
952
        w.add('v1', [], ['hello\n'])
 
953
        w.add('v2', ['v1'], ['hello\n', 'there\n'])
 
954
 
 
955
        tmpf = StringIO()
 
956
        write_weave(w, tmpf)
 
957
 
 
958
        # Because we are corrupting, we need to make sure we have the exact text
 
959
        self.assertEquals('# bzr weave file v5\n'
 
960
                          'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
961
                          'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
962
                          'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
 
963
                          tmpf.getvalue())
 
964
 
 
965
        # Change a single letter
 
966
        tmpf = StringIO('# bzr weave file v5\n'
 
967
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
968
                        'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
969
                        'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
 
970
 
 
971
        w = read_weave(tmpf)
 
972
 
 
973
        self.assertEqual('hello\n', w.get_text('v1'))
 
974
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
975
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
976
        self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
 
977
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
978
 
 
979
        # Change the sha checksum
 
980
        tmpf = StringIO('# bzr weave file v5\n'
 
981
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
982
                        'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
983
                        'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
 
984
 
 
985
        w = read_weave(tmpf)
 
986
 
 
987
        self.assertEqual('hello\n', w.get_text('v1'))
 
988
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
989
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
990
        self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
 
991
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
992
 
 
993