3
# Copyright (C) 2005 by Canonical Ltd
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License as published by
7
# the Free Software Foundation; either version 2 of the License, or
8
# (at your option) any later version.
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
# GNU General Public License for more details.
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20
# TODO: tests regarding version names
21
# TODO: rbc 20050108 test that join does not leave an inconsistent weave
24
"""test suite for weave algorithm"""
26
from pprint import pformat
28
import bzrlib.errors as errors
29
from bzrlib.weave import Weave, WeaveFormatError, WeaveError, reweave
30
from bzrlib.weavefile import write_weave, read_weave
31
from bzrlib.tests import TestCase
32
from bzrlib.osutils import sha_string
35
# texts for use in testing
36
TEXT_0 = ["Hello world"]
37
TEXT_1 = ["Hello world",
41
class TestBase(TestCase):
42
def check_read_write(self, k):
43
"""Check the weave k can be written & re-read."""
44
from tempfile import TemporaryFile
53
self.log('serialized weave:')
57
self.log('parents: %s' % (k._parents == k2._parents))
58
self.log(' %r' % k._parents)
59
self.log(' %r' % k2._parents)
61
self.fail('read/write check failed')
64
class WeaveContains(TestBase):
65
"""Weave __contains__ operator"""
68
self.assertFalse('foo' in k)
69
k.add('foo', [], TEXT_1)
70
self.assertTrue('foo' in k)
78
class StoreText(TestBase):
79
"""Store and retrieve a simple text."""
82
idx = k.add('text0', [], TEXT_0)
83
self.assertEqual(k.get(idx), TEXT_0)
84
self.assertEqual(idx, 0)
87
class AnnotateOne(TestBase):
90
k.add('text0', [], TEXT_0)
91
self.assertEqual(k.annotate(0),
95
class StoreTwo(TestBase):
99
idx = k.add('text0', [], TEXT_0)
100
self.assertEqual(idx, 0)
102
idx = k.add('text1', [], TEXT_1)
103
self.assertEqual(idx, 1)
105
self.assertEqual(k.get(0), TEXT_0)
106
self.assertEqual(k.get(1), TEXT_1)
109
class AddWithGivenSha(TestBase):
111
"""Add with caller-supplied SHA-1"""
115
k.add('text0', [], [t], sha1=sha_string(t))
118
class GetSha1(TestBase):
119
def test_get_sha1(self):
121
k.add('text0', [], 'text0')
122
self.assertEqual('34dc0e430c642a26c3dd1c2beb7a8b4f4445eb79',
124
self.assertRaises(errors.WeaveRevisionNotPresent,
126
self.assertRaises(errors.WeaveRevisionNotPresent,
130
class InvalidAdd(TestBase):
131
"""Try to use invalid version number during add."""
135
self.assertRaises(IndexError,
142
class RepeatedAdd(TestBase):
143
"""Add the same version twice; harmless."""
146
idx = k.add('text0', [], TEXT_0)
147
idx2 = k.add('text0', [], TEXT_0)
148
self.assertEqual(idx, idx2)
151
class InvalidRepeatedAdd(TestBase):
154
idx = k.add('text0', [], TEXT_0)
155
self.assertRaises(WeaveError,
159
['not the same text'])
160
self.assertRaises(WeaveError,
163
[12], # not the right parents
167
class InsertLines(TestBase):
168
"""Store a revision that adds one line to the original.
170
Look at the annotations to make sure that the first line is matched
171
and not stored repeatedly."""
175
k.add('text0', [], ['line 1'])
176
k.add('text1', [0], ['line 1', 'line 2'])
178
self.assertEqual(k.annotate(0),
181
self.assertEqual(k.get(1),
185
self.assertEqual(k.annotate(1),
189
k.add('text2', [0], ['line 1', 'diverged line'])
191
self.assertEqual(k.annotate(2),
193
(2, 'diverged line')])
195
text3 = ['line 1', 'middle line', 'line 2']
200
# self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]), text3))))
202
self.log("k._weave=" + pformat(k._weave))
204
self.assertEqual(k.annotate(3),
209
# now multiple insertions at different places
212
['line 1', 'aaa', 'middle line', 'bbb', 'line 2', 'ccc'])
214
self.assertEqual(k.annotate(4),
223
class DeleteLines(TestBase):
224
"""Deletion of lines from existing text.
226
Try various texts all based on a common ancestor."""
230
base_text = ['one', 'two', 'three', 'four']
232
k.add('text0', [], base_text)
234
texts = [['one', 'two', 'three'],
235
['two', 'three', 'four'],
237
['one', 'two', 'three', 'four'],
242
ver = k.add('text%d' % i,
246
self.log('final weave:')
247
self.log('k._weave=' + pformat(k._weave))
249
for i in range(len(texts)):
250
self.assertEqual(k.get(i+1),
254
class SuicideDelete(TestBase):
255
"""Invalid weave which tries to add and delete simultaneously."""
261
k._weave = [('{', 0),
268
################################### SKIPPED
269
# Weave.get doesn't trap this anymore
272
self.assertRaises(WeaveFormatError,
277
class CannedDelete(TestBase):
278
"""Unpack canned weave with deleted lines."""
285
k._weave = [('{', 0),
288
'line to be deleted',
293
k._sha1s = [sha_string('first lineline to be deletedlast line')
294
, sha_string('first linelast line')]
296
self.assertEqual(k.get(0),
298
'line to be deleted',
302
self.assertEqual(k.get(1),
308
class CannedReplacement(TestBase):
309
"""Unpack canned weave with deleted lines."""
313
k._parents = [frozenset(),
316
k._weave = [('{', 0),
319
'line to be deleted',
327
k._sha1s = [sha_string('first lineline to be deletedlast line')
328
, sha_string('first linereplacement linelast line')]
330
self.assertEqual(k.get(0),
332
'line to be deleted',
336
self.assertEqual(k.get(1),
343
class BadWeave(TestBase):
344
"""Test that we trap an insert which should not occur."""
348
k._parents = [frozenset(),
350
k._weave = ['bad line',
354
' added in version 1',
363
################################### SKIPPED
364
# Weave.get doesn't trap this anymore
368
self.assertRaises(WeaveFormatError,
373
class BadInsert(TestBase):
374
"""Test that we trap an insert which should not occur."""
378
k._parents = [frozenset(),
383
k._weave = [('{', 0),
386
' added in version 1',
394
# this is not currently enforced by get
395
return ##########################################
397
self.assertRaises(WeaveFormatError,
401
self.assertRaises(WeaveFormatError,
406
class InsertNested(TestBase):
407
"""Insertion with nested instructions."""
411
k._parents = [frozenset(),
416
k._weave = [('{', 0),
419
' added in version 1',
428
k._sha1s = [sha_string('foo {}')
429
, sha_string('foo { added in version 1 also from v1}')
430
, sha_string('foo { added in v2}')
431
, sha_string('foo { added in version 1 added in v2 also from v1}')
434
self.assertEqual(k.get(0),
438
self.assertEqual(k.get(1),
440
' added in version 1',
444
self.assertEqual(k.get(2),
449
self.assertEqual(k.get(3),
451
' added in version 1',
457
class DeleteLines2(TestBase):
458
"""Test recording revisions that delete lines.
460
This relies on the weave having a way to represent lines knocked
461
out by a later revision."""
465
k.add('text0', [], ["line the first",
470
self.assertEqual(len(k.get(0)), 4)
472
k.add('text1', [0], ["line the first",
475
self.assertEqual(k.get(1),
479
self.assertEqual(k.annotate(1),
480
[(0, "line the first"),
484
class IncludeVersions(TestBase):
485
"""Check texts that are stored across multiple revisions.
487
Here we manually create a weave with particular encoding and make
488
sure it unpacks properly.
490
Text 0 includes nothing; text 1 includes text 0 and adds some
497
k._parents = [frozenset(), frozenset([0])]
498
k._weave = [('{', 0),
505
k._sha1s = [sha_string('first line')
506
, sha_string('first linesecond line')]
508
self.assertEqual(k.get(1),
512
self.assertEqual(k.get(0),
516
class DivergedIncludes(TestBase):
517
"""Weave with two diverged texts based on version 0.
522
k._parents = [frozenset(),
526
k._weave = [('{', 0),
533
"alternative second line",
537
k._sha1s = [sha_string('first line')
538
, sha_string('first linesecond line')
539
, sha_string('first linealternative second line')]
541
self.assertEqual(k.get(0),
544
self.assertEqual(k.get(1),
548
self.assertEqual(k.get(2),
550
"alternative second line"])
552
self.assertEqual(list(k.inclusions([2])),
556
class ReplaceLine(TestBase):
560
text0 = ['cheddar', 'stilton', 'gruyere']
561
text1 = ['cheddar', 'blue vein', 'neufchatel', 'chevre']
563
k.add('text0', [], text0)
564
k.add('text1', [0], text1)
566
self.log('k._weave=' + pformat(k._weave))
568
self.assertEqual(k.get(0), text0)
569
self.assertEqual(k.get(1), text1)
572
class Merge(TestBase):
573
"""Storage of versions that merge diverged parents"""
578
['header', '', 'line from 1'],
579
['header', '', 'line from 2', 'more from 2'],
580
['header', '', 'line from 1', 'fixup line', 'line from 2'],
583
k.add('text0', [], texts[0])
584
k.add('text1', [0], texts[1])
585
k.add('text2', [0], texts[2])
586
k.add('merge', [0, 1, 2], texts[3])
588
for i, t in enumerate(texts):
589
self.assertEqual(k.get(i), t)
591
self.assertEqual(k.annotate(3),
599
self.assertEqual(list(k.inclusions([3])),
602
self.log('k._weave=' + pformat(k._weave))
604
self.check_read_write(k)
607
class Conflicts(TestBase):
608
"""Test detection of conflicting regions during a merge.
610
A base version is inserted, then two descendents try to
611
insert different lines in the same place. These should be
612
reported as a possible conflict and forwarded to the user."""
617
k.add([], ['aaa', 'bbb'])
618
k.add([0], ['aaa', '111', 'bbb'])
619
k.add([1], ['aaa', '222', 'bbb'])
621
merged = k.merge([1, 2])
623
self.assertEquals([[['aaa']],
628
class NonConflict(TestBase):
629
"""Two descendants insert compatible changes.
631
No conflict should be reported."""
636
k.add([], ['aaa', 'bbb'])
637
k.add([0], ['111', 'aaa', 'ccc', 'bbb'])
638
k.add([1], ['aaa', 'ccc', 'bbb', '222'])
641
class AutoMerge(TestBase):
645
texts = [['header', 'aaa', 'bbb'],
646
['header', 'aaa', 'line from 1', 'bbb'],
647
['header', 'aaa', 'bbb', 'line from 2', 'more from 2'],
650
k.add('text0', [], texts[0])
651
k.add('text1', [0], texts[1])
652
k.add('text2', [0], texts[2])
654
self.log('k._weave=' + pformat(k._weave))
656
m = list(k.mash_iter([0, 1, 2]))
662
'line from 2', 'more from 2'])
665
class Khayyam(TestBase):
666
"""Test changes to multi-line texts, and read/write"""
669
"""A Book of Verses underneath the Bough,
670
A Jug of Wine, a Loaf of Bread, -- and Thou
671
Beside me singing in the Wilderness --
672
Oh, Wilderness were Paradise enow!""",
674
"""A Book of Verses underneath the Bough,
675
A Jug of Wine, a Loaf of Bread, -- and Thou
676
Beside me singing in the Wilderness --
677
Oh, Wilderness were Paradise now!""",
679
"""A Book of poems underneath the tree,
680
A Jug of Wine, a Loaf of Bread,
682
Beside me singing in the Wilderness --
683
Oh, Wilderness were Paradise now!
687
"""A Book of Verses underneath the Bough,
688
A Jug of Wine, a Loaf of Bread,
690
Beside me singing in the Wilderness --
691
Oh, Wilderness were Paradise now!""",
693
texts = [[l.strip() for l in t.split('\n')] for t in rawtexts]
699
ver = k.add('text%d' % i,
704
self.log("k._weave=" + pformat(k._weave))
706
for i, t in enumerate(texts):
707
self.assertEqual(k.get(i), t)
709
self.check_read_write(k)
712
class MergeCases(TestBase):
713
def doMerge(self, base, a, b, mp):
714
from cStringIO import StringIO
715
from textwrap import dedent
721
w.add('text0', [], map(addcrlf, base))
722
w.add('text1', [0], map(addcrlf, a))
723
w.add('text2', [0], map(addcrlf, b))
725
self.log('weave is:')
728
self.log(tmpf.getvalue())
730
self.log('merge plan:')
731
p = list(w.plan_merge(1, 2))
732
for state, line in p:
734
self.log('%12s | %s' % (state, line[:-1]))
738
mt.writelines(w.weave_merge(p))
740
self.log(mt.getvalue())
742
mp = map(addcrlf, mp)
743
self.assertEqual(mt.readlines(), mp)
746
def testOneInsert(self):
752
def testSeparateInserts(self):
753
self.doMerge(['aaa', 'bbb', 'ccc'],
754
['aaa', 'xxx', 'bbb', 'ccc'],
755
['aaa', 'bbb', 'yyy', 'ccc'],
756
['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
758
def testSameInsert(self):
759
self.doMerge(['aaa', 'bbb', 'ccc'],
760
['aaa', 'xxx', 'bbb', 'ccc'],
761
['aaa', 'xxx', 'bbb', 'yyy', 'ccc'],
762
['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
764
def testOverlappedInsert(self):
765
self.doMerge(['aaa', 'bbb'],
766
['aaa', 'xxx', 'yyy', 'bbb'],
767
['aaa', 'xxx', 'bbb'],
768
['aaa', '<<<<<<<', 'xxx', 'yyy', '=======', 'xxx',
771
# really it ought to reduce this to
772
# ['aaa', 'xxx', 'yyy', 'bbb']
775
def testClashReplace(self):
776
self.doMerge(['aaa'],
779
['<<<<<<<', 'xxx', '=======', 'yyy', 'zzz',
782
def testNonClashInsert(self):
783
self.doMerge(['aaa'],
786
['<<<<<<<', 'xxx', 'aaa', '=======', 'yyy', 'zzz',
789
self.doMerge(['aaa'],
795
def testDeleteAndModify(self):
796
"""Clashing delete and modification.
798
If one side modifies a region and the other deletes it then
799
there should be a conflict with one side blank.
802
#######################################
803
# skippd, not working yet
806
self.doMerge(['aaa', 'bbb', 'ccc'],
807
['aaa', 'ddd', 'ccc'],
809
['<<<<<<<<', 'aaa', '=======', '>>>>>>>', 'ccc'])
812
class JoinWeavesTests(TestBase):
814
super(JoinWeavesTests, self).setUp()
815
self.weave1 = Weave()
816
self.lines1 = ['hello\n']
817
self.lines3 = ['hello\n', 'cruel\n', 'world\n']
818
self.weave1.add('v1', [], self.lines1)
819
self.weave1.add('v2', [0], ['hello\n', 'world\n'])
820
self.weave1.add('v3', [1], self.lines3)
822
def test_join_empty(self):
823
"""Join two empty weaves."""
824
eq = self.assertEqual
828
eq(w1.numversions(), 0)
830
def test_join_empty_to_nonempty(self):
831
"""Join empty weave onto nonempty."""
832
self.weave1.join(Weave())
833
self.assertEqual(len(self.weave1), 3)
835
def test_join_unrelated(self):
836
"""Join two weaves with no history in common."""
838
wb.add('b1', [], ['line from b\n'])
841
eq = self.assertEqual
843
eq(sorted(list(w1.iter_names())),
844
['b1', 'v1', 'v2', 'v3'])
846
def test_join_related(self):
847
wa = self.weave1.copy()
848
wb = self.weave1.copy()
849
wa.add('a1', ['v3'], ['hello\n', 'sweet\n', 'world\n'])
850
wb.add('b1', ['v3'], ['hello\n', 'pale blue\n', 'world\n'])
851
eq = self.assertEquals
856
eq(wa.get_lines('b1'),
857
['hello\n', 'pale blue\n', 'world\n'])
859
def test_join_parent_disagreement(self):
860
"""Cannot join weaves with different parents for a version."""
863
wa.add('v1', [], ['hello\n'])
865
wb.add('v1', ['v0'], ['hello\n'])
866
self.assertRaises(WeaveError,
869
def test_join_text_disagreement(self):
870
"""Cannot join weaves with different texts for a version."""
873
wa.add('v1', [], ['hello\n'])
874
wb.add('v1', [], ['not\n', 'hello\n'])
875
self.assertRaises(WeaveError,
878
def test_join_unordered(self):
879
"""Join weaves where indexes differ.
881
The source weave contains a different version at index 0."""
882
wa = self.weave1.copy()
884
wb.add('x1', [], ['line from x1\n'])
885
wb.add('v1', [], ['hello\n'])
886
wb.add('v2', ['v1'], ['hello\n', 'world\n'])
888
eq = self.assertEquals
889
eq(sorted(wa.iter_names()), ['v1', 'v2', 'v3', 'x1',])
890
eq(wa.get_text('x1'), 'line from x1\n')
893
class Corruption(TestCase):
895
def test_detection(self):
896
# Test weaves detect corruption.
898
# Weaves contain a checksum of their texts.
899
# When a text is extracted, this checksum should be
903
w.add('v1', [], ['hello\n'])
904
w.add('v2', ['v1'], ['hello\n', 'there\n'])
906
# We are going to invasively corrupt the text
907
# Make sure the internals of weave are the same
908
self.assertEqual([('{', 0)
916
self.assertEqual(['f572d396fae9206628714fb2ce00f72e94f2258f'
917
, '90f265c6e75f1c8f9ab76dcf85528352c5f215ef'
922
w._weave[4] = 'There\n'
924
self.assertEqual('hello\n', w.get_text('v1'))
925
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
926
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
927
self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
928
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
931
w._weave[4] = 'there\n'
932
self.assertEqual('hello\nthere\n', w.get_text('v2'))
934
#Invalid checksum, first digit changed
935
w._sha1s[1] = 'f0f265c6e75f1c8f9ab76dcf85528352c5f215ef'
937
self.assertEqual('hello\n', w.get_text('v1'))
938
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
939
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
940
self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
941
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
943
def test_written_detection(self):
944
# Test detection of weave file corruption.
946
# Make sure that we can detect if a weave file has
947
# been corrupted. This doesn't test all forms of corruption,
948
# but it at least helps verify the data you get, is what you want.
949
from cStringIO import StringIO
952
w.add('v1', [], ['hello\n'])
953
w.add('v2', ['v1'], ['hello\n', 'there\n'])
958
# Because we are corrupting, we need to make sure we have the exact text
959
self.assertEquals('# bzr weave file v5\n'
960
'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
961
'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
962
'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
965
# Change a single letter
966
tmpf = StringIO('# bzr weave file v5\n'
967
'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
968
'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
969
'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
973
self.assertEqual('hello\n', w.get_text('v1'))
974
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
975
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
976
self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
977
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
979
# Change the sha checksum
980
tmpf = StringIO('# bzr weave file v5\n'
981
'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
982
'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
983
'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
987
self.assertEqual('hello\n', w.get_text('v1'))
988
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
989
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
990
self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
991
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
994
class InstrumentedWeave(Weave):
995
"""Keep track of how many times functions are called."""
997
def __init__(self, weave_name=None):
998
self._extract_count = 0
999
Weave.__init__(self, weave_name=weave_name)
1001
def _extract(self, versions):
1002
self._extract_count += 1
1003
return Weave._extract(self, versions)
1006
class JoinOptimization(TestCase):
1007
"""Test that Weave.join() doesn't extract all texts, only what must be done."""
1009
def test_join(self):
1010
w1 = InstrumentedWeave()
1011
w2 = InstrumentedWeave()
1014
txt1 = ['a\n', 'b\n']
1015
txt2 = ['a\n', 'c\n']
1016
txt3 = ['a\n', 'b\n', 'c\n']
1018
w1.add('txt0', [], txt0) # extract 1a
1019
w2.add('txt0', [], txt0) # extract 1b
1020
w1.add('txt1', [0], txt1)# extract 2a
1021
w2.add('txt2', [0], txt2)# extract 2b
1022
w1.join(w2) # extract 3a to add txt2
1023
w2.join(w1) # extract 3b to add txt1
1025
w1.add('txt3', [1, 2], txt3) # extract 4a
1026
w2.add('txt3', [1, 2], txt3) # extract 4b
1027
# These secretly have inverted parents
1029
# This should not have to do any extractions
1030
w1.join(w2) # NO extract, texts already present with same parents
1031
w2.join(w1) # NO extract, texts already present with same parents
1033
self.assertEqual(4, w1._extract_count)
1034
self.assertEqual(4, w2._extract_count)
1036
def test_double_parent(self):
1037
# It should not be considered illegal to add
1038
# a revision with the same parent twice
1039
w1 = InstrumentedWeave()
1040
w2 = InstrumentedWeave()
1043
txt1 = ['a\n', 'b\n']
1044
txt2 = ['a\n', 'c\n']
1045
txt3 = ['a\n', 'b\n', 'c\n']
1047
w1.add('txt0', [], txt0)
1048
w2.add('txt0', [], txt0)
1049
w1.add('txt1', [0], txt1)
1050
w2.add('txt1', [0,0], txt1)
1051
# Same text, effectively the same, because the
1052
# parent is only repeated
1053
w1.join(w2) # extract 3a to add txt2
1054
w2.join(w1) # extract 3b to add txt1
1057
class MismatchedTexts(TestCase):
1058
"""Test that merging two weaves with different texts fails."""
1060
def test_reweave(self):
1064
w1.add('txt0', [], ['a\n'])
1065
w2.add('txt0', [], ['a\n'])
1066
w1.add('txt1', [0], ['a\n', 'b\n'])
1067
w2.add('txt1', [0], ['a\n', 'c\n'])
1069
self.assertRaises(errors.WeaveTextDiffers, w1.reweave, w2)