~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_weave.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2007-03-28 06:58:22 UTC
  • mfrom: (2379.2.3 hpss-chroot)
  • Revision ID: pqm@pqm.ubuntu.com-20070328065822-999550a858a3ced3
(robertc) Fix chroot urls to not expose the url of the transport they are protecting, allowing regular url operations to work on them. (Robert Collins, Andrew Bennetts)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011, 2016 Canonical Ltd
 
1
#! /usr/bin/python2.4
 
2
 
 
3
# Copyright (C) 2005 Canonical Ltd
2
4
#
3
5
# This program is free software; you can redistribute it and/or modify
4
6
# it under the terms of the GNU General Public License as published by
12
14
#
13
15
# You should have received a copy of the GNU General Public License
14
16
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
17
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
18
 
17
19
 
18
20
# TODO: tests regarding version names
19
 
# TODO: rbc 20050108 test that join does not leave an inconsistent weave
 
21
# TODO: rbc 20050108 test that join does not leave an inconsistent weave 
20
22
#       if it fails.
21
23
 
22
24
"""test suite for weave algorithm"""
28
30
    )
29
31
from bzrlib.osutils import sha_string
30
32
from bzrlib.tests import TestCase, TestCaseInTempDir
31
 
from bzrlib.weave import Weave, WeaveFormatError
 
33
from bzrlib.weave import Weave, WeaveFormatError, WeaveError, reweave
32
34
from bzrlib.weavefile import write_weave, read_weave
33
35
 
34
36
 
39
41
 
40
42
 
41
43
class TestBase(TestCase):
42
 
 
43
44
    def check_read_write(self, k):
44
45
        """Check the weave k can be written & re-read."""
45
46
        from tempfile import TemporaryFile
64
65
 
65
66
class WeaveContains(TestBase):
66
67
    """Weave __contains__ operator"""
67
 
 
68
68
    def runTest(self):
69
 
        k = Weave(get_scope=lambda:None)
 
69
        k = Weave()
70
70
        self.assertFalse('foo' in k)
71
71
        k.add_lines('foo', [], TEXT_1)
72
72
        self.assertTrue('foo' in k)
73
73
 
74
74
 
75
75
class Easy(TestBase):
76
 
 
77
76
    def runTest(self):
78
77
        k = Weave()
79
78
 
80
79
 
 
80
class StoreText(TestBase):
 
81
    """Store and retrieve a simple text."""
 
82
 
 
83
    def test_storing_text(self):
 
84
        k = Weave()
 
85
        idx = k.add_lines('text0', [], TEXT_0)
 
86
        self.assertEqual(k.get_lines(idx), TEXT_0)
 
87
        self.assertEqual(idx, 0)
 
88
 
 
89
 
81
90
class AnnotateOne(TestBase):
82
 
 
83
91
    def runTest(self):
84
92
        k = Weave()
85
93
        k.add_lines('text0', [], TEXT_0)
87
95
                         [('text0', TEXT_0[0])])
88
96
 
89
97
 
 
98
class StoreTwo(TestBase):
 
99
    def runTest(self):
 
100
        k = Weave()
 
101
 
 
102
        idx = k.add_lines('text0', [], TEXT_0)
 
103
        self.assertEqual(idx, 0)
 
104
 
 
105
        idx = k.add_lines('text1', [], TEXT_1)
 
106
        self.assertEqual(idx, 1)
 
107
 
 
108
        self.assertEqual(k.get_lines(0), TEXT_0)
 
109
        self.assertEqual(k.get_lines(1), TEXT_1)
 
110
 
 
111
 
 
112
class GetSha1(TestBase):
 
113
    def test_get_sha1(self):
 
114
        k = Weave()
 
115
        k.add_lines('text0', [], 'text0')
 
116
        self.assertEqual('34dc0e430c642a26c3dd1c2beb7a8b4f4445eb79',
 
117
                         k.get_sha1('text0'))
 
118
        self.assertRaises(errors.RevisionNotPresent,
 
119
                          k.get_sha1, 0)
 
120
        self.assertRaises(errors.RevisionNotPresent,
 
121
                          k.get_sha1, 'text1')
 
122
                        
 
123
 
90
124
class InvalidAdd(TestBase):
91
125
    """Try to use invalid version number during add."""
92
 
 
93
126
    def runTest(self):
94
127
        k = Weave()
95
128
 
102
135
 
103
136
class RepeatedAdd(TestBase):
104
137
    """Add the same version twice; harmless."""
105
 
 
106
 
    def test_duplicate_add(self):
 
138
    def runTest(self):
107
139
        k = Weave()
108
140
        idx = k.add_lines('text0', [], TEXT_0)
109
141
        idx2 = k.add_lines('text0', [], TEXT_0)
111
143
 
112
144
 
113
145
class InvalidRepeatedAdd(TestBase):
114
 
 
115
146
    def runTest(self):
116
147
        k = Weave()
117
148
        k.add_lines('basis', [], TEXT_0)
126
157
                          'text0',
127
158
                          ['basis'],         # not the right parents
128
159
                          TEXT_0)
129
 
 
 
160
        
130
161
 
131
162
class InsertLines(TestBase):
132
163
    """Store a revision that adds one line to the original.
175
206
              ['text0', 'text1', 'text3'],
176
207
              ['line 1', 'aaa', 'middle line', 'bbb', 'line 2', 'ccc'])
177
208
 
178
 
        self.assertEqual(k.annotate('text4'),
 
209
        self.assertEqual(k.annotate('text4'), 
179
210
                         [('text0', 'line 1'),
180
211
                          ('text4', 'aaa'),
181
212
                          ('text3', 'middle line'),
194
225
        base_text = ['one', 'two', 'three', 'four']
195
226
 
196
227
        k.add_lines('text0', [], base_text)
197
 
 
 
228
        
198
229
        texts = [['one', 'two', 'three'],
199
230
                 ['two', 'three', 'four'],
200
231
                 ['one', 'four'],
231
262
                ]
232
263
        ################################### SKIPPED
233
264
        # Weave.get doesn't trap this anymore
234
 
        return
 
265
        return 
235
266
 
236
267
        self.assertRaises(WeaveFormatError,
237
268
                          k.get_lines,
238
 
                          0)
 
269
                          0)        
239
270
 
240
271
 
241
272
class CannedDelete(TestBase):
283
314
                'line to be deleted',
284
315
                (']', 1),
285
316
                ('{', 1),
286
 
                'replacement line',
 
317
                'replacement line',                
287
318
                ('}', 1),
288
319
                'last line',
289
320
                ('}', 0),
326
357
 
327
358
        ################################### SKIPPED
328
359
        # Weave.get doesn't trap this anymore
329
 
        return
 
360
        return 
330
361
 
331
362
 
332
363
        self.assertRaises(WeaveFormatError,
404
435
                          '  added in version 1',
405
436
                          '  also from v1',
406
437
                          '}'])
407
 
 
 
438
                       
408
439
        self.assertEqual(k.get_lines(2),
409
440
                         ['foo {',
410
441
                          '  added in v2',
416
447
                          '  added in v2',
417
448
                          '  also from v1',
418
449
                          '}'])
419
 
 
 
450
                         
420
451
 
421
452
class DeleteLines2(TestBase):
422
453
    """Test recording revisions that delete lines.
498
529
                ('}', 1),
499
530
                ('{', 2),
500
531
                "alternative second line",
501
 
                ('}', 2),
 
532
                ('}', 2),                
502
533
                ]
503
534
 
504
535
        k._sha1s = [sha_string('first line')
526
557
 
527
558
        text0 = ['cheddar', 'stilton', 'gruyere']
528
559
        text1 = ['cheddar', 'blue vein', 'neufchatel', 'chevre']
529
 
 
 
560
        
530
561
        k.add_lines('text0', [], text0)
531
562
        k.add_lines('text1', ['text0'], text1)
532
563
 
538
569
 
539
570
class Merge(TestBase):
540
571
    """Storage of versions that merge diverged parents"""
541
 
 
542
572
    def runTest(self):
543
573
        k = Weave()
544
574
 
588
618
 
589
619
        merged = k.merge([1, 2])
590
620
 
591
 
        self.assertEqual([[['aaa']],
 
621
        self.assertEquals([[['aaa']],
592
622
                           [['111'], ['222']],
593
623
                           [['bbb']]])
594
624
 
615
645
            A Jug of Wine, a Loaf of Bread, -- and Thou
616
646
            Beside me singing in the Wilderness --
617
647
            Oh, Wilderness were Paradise enow!""",
618
 
 
 
648
            
619
649
            """A Book of Verses underneath the Bough,
620
650
            A Jug of Wine, a Loaf of Bread, -- and Thou
621
651
            Beside me singing in the Wilderness --
655
685
 
656
686
 
657
687
class JoinWeavesTests(TestBase):
658
 
 
659
688
    def setUp(self):
660
689
        super(JoinWeavesTests, self).setUp()
661
690
        self.weave1 = Weave()
664
693
        self.weave1.add_lines('v1', [], self.lines1)
665
694
        self.weave1.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
666
695
        self.weave1.add_lines('v3', ['v2'], self.lines3)
 
696
        
 
697
    def test_join_empty(self):
 
698
        """Join two empty weaves."""
 
699
        eq = self.assertEqual
 
700
        w1 = Weave()
 
701
        w2 = Weave()
 
702
        w1.join(w2)
 
703
        eq(len(w1), 0)
 
704
        
 
705
    def test_join_empty_to_nonempty(self):
 
706
        """Join empty weave onto nonempty."""
 
707
        self.weave1.join(Weave())
 
708
        self.assertEqual(len(self.weave1), 3)
 
709
 
 
710
    def test_join_unrelated(self):
 
711
        """Join two weaves with no history in common."""
 
712
        wb = Weave()
 
713
        wb.add_lines('b1', [], ['line from b\n'])
 
714
        w1 = self.weave1
 
715
        w1.join(wb)
 
716
        eq = self.assertEqual
 
717
        eq(len(w1), 4)
 
718
        eq(sorted(w1.versions()),
 
719
           ['b1', 'v1', 'v2', 'v3'])
 
720
 
 
721
    def test_join_related(self):
 
722
        wa = self.weave1.copy()
 
723
        wb = self.weave1.copy()
 
724
        wa.add_lines('a1', ['v3'], ['hello\n', 'sweet\n', 'world\n'])
 
725
        wb.add_lines('b1', ['v3'], ['hello\n', 'pale blue\n', 'world\n'])
 
726
        eq = self.assertEquals
 
727
        eq(len(wa), 4)
 
728
        eq(len(wb), 4)
 
729
        wa.join(wb)
 
730
        eq(len(wa), 5)
 
731
        eq(wa.get_lines('b1'),
 
732
           ['hello\n', 'pale blue\n', 'world\n'])
 
733
 
 
734
    def test_join_parent_disagreement(self):
 
735
        #join reconciles differening parents into a union.
 
736
        wa = Weave()
 
737
        wb = Weave()
 
738
        wa.add_lines('v1', [], ['hello\n'])
 
739
        wb.add_lines('v0', [], [])
 
740
        wb.add_lines('v1', ['v0'], ['hello\n'])
 
741
        wa.join(wb)
 
742
        self.assertEqual(['v0'], wa.get_parents('v1'))
 
743
 
 
744
    def test_join_text_disagreement(self):
 
745
        """Cannot join weaves with different texts for a version."""
 
746
        wa = Weave()
 
747
        wb = Weave()
 
748
        wa.add_lines('v1', [], ['hello\n'])
 
749
        wb.add_lines('v1', [], ['not\n', 'hello\n'])
 
750
        self.assertRaises(WeaveError,
 
751
                          wa.join, wb)
 
752
 
 
753
    def test_join_unordered(self):
 
754
        """Join weaves where indexes differ.
 
755
        
 
756
        The source weave contains a different version at index 0."""
 
757
        wa = self.weave1.copy()
 
758
        wb = Weave()
 
759
        wb.add_lines('x1', [], ['line from x1\n'])
 
760
        wb.add_lines('v1', [], ['hello\n'])
 
761
        wb.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
 
762
        wa.join(wb)
 
763
        eq = self.assertEquals
 
764
        eq(sorted(wa.versions()), ['v1', 'v2', 'v3', 'x1',])
 
765
        eq(wa.get_text('x1'), 'line from x1\n')
667
766
 
668
767
    def test_written_detection(self):
669
768
        # Test detection of weave file corruption.
681
780
        write_weave(w, tmpf)
682
781
 
683
782
        # Because we are corrupting, we need to make sure we have the exact text
684
 
        self.assertEqual('# bzr weave file v5\n'
 
783
        self.assertEquals('# bzr weave file v5\n'
685
784
                          'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
686
785
                          'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
687
786
                          'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
714
813
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
715
814
 
716
815
 
717
 
class TestWeave(TestCase):
718
 
 
719
 
    def test_allow_reserved_false(self):
720
 
        w = Weave('name', allow_reserved=False)
721
 
        # Add lines is checked at the WeaveFile level, not at the Weave level
722
 
        w.add_lines('name:', [], TEXT_1)
723
 
        # But get_lines is checked at this level
724
 
        self.assertRaises(errors.ReservedId, w.get_lines, 'name:')
725
 
 
726
 
    def test_allow_reserved_true(self):
727
 
        w = Weave('name', allow_reserved=True)
728
 
        w.add_lines('name:', [], TEXT_1)
729
 
        self.assertEqual(TEXT_1, w.get_lines('name:'))
730
 
 
731
 
 
732
816
class InstrumentedWeave(Weave):
733
817
    """Keep track of how many times functions are called."""
734
 
 
 
818
    
735
819
    def __init__(self, weave_name=None):
736
820
        self._extract_count = 0
737
821
        Weave.__init__(self, weave_name=weave_name)
741
825
        return Weave._extract(self, versions)
742
826
 
743
827
 
 
828
class JoinOptimization(TestCase):
 
829
    """Test that Weave.join() doesn't extract all texts, only what must be done."""
 
830
 
 
831
    def test_join(self):
 
832
        w1 = InstrumentedWeave()
 
833
        w2 = InstrumentedWeave()
 
834
 
 
835
        txt0 = ['a\n']
 
836
        txt1 = ['a\n', 'b\n']
 
837
        txt2 = ['a\n', 'c\n']
 
838
        txt3 = ['a\n', 'b\n', 'c\n']
 
839
 
 
840
        w1.add_lines('txt0', [], txt0) # extract 1a
 
841
        w2.add_lines('txt0', [], txt0) # extract 1b
 
842
        w1.add_lines('txt1', ['txt0'], txt1)# extract 2a
 
843
        w2.add_lines('txt2', ['txt0'], txt2)# extract 2b
 
844
        w1.join(w2) # extract 3a to add txt2 
 
845
        w2.join(w1) # extract 3b to add txt1 
 
846
 
 
847
        w1.add_lines('txt3', ['txt1', 'txt2'], txt3) # extract 4a 
 
848
        w2.add_lines('txt3', ['txt2', 'txt1'], txt3) # extract 4b
 
849
        # These secretly have inverted parents
 
850
 
 
851
        # This should not have to do any extractions
 
852
        w1.join(w2) # NO extract, texts already present with same parents
 
853
        w2.join(w1) # NO extract, texts already present with same parents
 
854
 
 
855
        self.assertEqual(4, w1._extract_count)
 
856
        self.assertEqual(4, w2._extract_count)
 
857
 
 
858
    def test_double_parent(self):
 
859
        # It should not be considered illegal to add
 
860
        # a revision with the same parent twice
 
861
        w1 = InstrumentedWeave()
 
862
        w2 = InstrumentedWeave()
 
863
 
 
864
        txt0 = ['a\n']
 
865
        txt1 = ['a\n', 'b\n']
 
866
        txt2 = ['a\n', 'c\n']
 
867
        txt3 = ['a\n', 'b\n', 'c\n']
 
868
 
 
869
        w1.add_lines('txt0', [], txt0)
 
870
        w2.add_lines('txt0', [], txt0)
 
871
        w1.add_lines('txt1', ['txt0'], txt1)
 
872
        w2.add_lines('txt1', ['txt0', 'txt0'], txt1)
 
873
        # Same text, effectively the same, because the
 
874
        # parent is only repeated
 
875
        w1.join(w2) # extract 3a to add txt2 
 
876
        w2.join(w1) # extract 3b to add txt1 
 
877
 
 
878
 
744
879
class TestNeedsReweave(TestCase):
745
880
    """Internal corner cases for when reweave is needed."""
746
881
 
760
895
 
761
896
 
762
897
class TestWeaveFile(TestCaseInTempDir):
763
 
 
 
898
    
764
899
    def test_empty_file(self):
765
900
        f = open('empty.weave', 'wb+')
766
901
        try: