3
# Copyright (C) 2005 by Canonical Ltd
1
# Copyright (C) 2005 Canonical Ltd
5
3
# This program is free software; you can redistribute it and/or modify
6
4
# it under the terms of the GNU General Public License as published by
7
5
# the Free Software Foundation; either version 2 of the License, or
8
6
# (at your option) any later version.
10
8
# This program is distributed in the hope that it will be useful,
11
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
11
# GNU General Public License for more details.
15
13
# You should have received a copy of the GNU General Public License
16
14
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20
18
# TODO: tests regarding version names
19
# TODO: rbc 20050108 test that join does not leave an inconsistent weave
24
22
"""test suite for weave algorithm"""
24
from pprint import pformat
28
from bzrlib.weave import Weave, WeaveFormatError
29
from bzrlib.osutils import sha_string
30
from bzrlib.tests import TestCase, TestCaseInTempDir
31
from bzrlib.weave import Weave, WeaveFormatError, WeaveError
29
32
from bzrlib.weavefile import write_weave, read_weave
30
from pprint import pformat
37
from sets import Set, ImmutableSet
39
frozenset = ImmutableSet
44
35
# texts for use in testing
135
131
def runTest(self):
138
k.add('text0', [], ['line 1'])
139
k.add('text1', [0], ['line 1', 'line 2'])
141
self.assertEqual(k.annotate(0),
144
self.assertEqual(k.get(1),
134
k.add_lines('text0', [], ['line 1'])
135
k.add_lines('text1', ['text0'], ['line 1', 'line 2'])
137
self.assertEqual(k.annotate('text0'),
138
[('text0', 'line 1')])
140
self.assertEqual(k.get_lines(1),
148
self.assertEqual(k.annotate(1),
152
k.add('text2', [0], ['line 1', 'diverged line'])
154
self.assertEqual(k.annotate(2),
156
(2, 'diverged line')])
144
self.assertEqual(k.annotate('text1'),
145
[('text0', 'line 1'),
146
('text1', 'line 2')])
148
k.add_lines('text2', ['text0'], ['line 1', 'diverged line'])
150
self.assertEqual(k.annotate('text2'),
151
[('text0', 'line 1'),
152
('text2', 'diverged line')])
158
154
text3 = ['line 1', 'middle line', 'line 2']
163
159
# self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]), text3))))
165
161
self.log("k._weave=" + pformat(k._weave))
167
self.assertEqual(k.annotate(3),
163
self.assertEqual(k.annotate('text3'),
164
[('text0', 'line 1'),
165
('text3', 'middle line'),
166
('text1', 'line 2')])
172
168
# now multiple insertions at different places
170
['text0', 'text1', 'text3'],
175
171
['line 1', 'aaa', 'middle line', 'bbb', 'line 2', 'ccc'])
177
self.assertEqual(k.annotate(4),
173
self.assertEqual(k.annotate('text4'),
174
[('text0', 'line 1'),
176
('text3', 'middle line'),
187
182
class DeleteLines(TestBase):
538
542
['header', '', 'line from 1', 'fixup line', 'line from 2'],
541
k.add('text0', [], texts[0])
542
k.add('text1', [0], texts[1])
543
k.add('text2', [0], texts[2])
544
k.add('merge', [0, 1, 2], texts[3])
545
k.add_lines('text0', [], texts[0])
546
k.add_lines('text1', ['text0'], texts[1])
547
k.add_lines('text2', ['text0'], texts[2])
548
k.add_lines('merge', ['text0', 'text1', 'text2'], texts[3])
546
550
for i, t in enumerate(texts):
547
self.assertEqual(k.get(i), t)
551
self.assertEqual(k.get_lines(i), t)
549
self.assertEqual(k.annotate(3),
553
self.assertEqual(k.annotate('merge'),
554
[('text0', 'header'),
556
('text1', 'line from 1'),
557
('merge', 'fixup line'),
558
('text2', 'line from 2'),
557
self.assertEqual(list(k.inclusions([3])),
561
self.assertEqual(list(k.get_ancestry(['merge'])),
562
['text0', 'text1', 'text2', 'merge'])
560
564
self.log('k._weave=' + pformat(k._weave))
595
k.add([], ['aaa', 'bbb'])
596
k.add([0], ['111', 'aaa', 'ccc', 'bbb'])
597
k.add([1], ['aaa', 'ccc', 'bbb', '222'])
603
class AutoMerge(TestBase):
607
texts = [['header', 'aaa', 'bbb'],
608
['header', 'aaa', 'line from 1', 'bbb'],
609
['header', 'aaa', 'bbb', 'line from 2', 'more from 2'],
612
k.add('text0', [], texts[0])
613
k.add('text1', [0], texts[1])
614
k.add('text2', [0], texts[2])
616
self.log('k._weave=' + pformat(k._weave))
618
m = list(k.mash_iter([0, 1, 2]))
624
'line from 2', 'more from 2'])
598
k.add_lines([], ['aaa', 'bbb'])
599
k.add_lines([0], ['111', 'aaa', 'ccc', 'bbb'])
600
k.add_lines([1], ['aaa', 'ccc', 'bbb', '222'])
628
603
class Khayyam(TestBase):
629
604
"""Test changes to multi-line texts, and read/write"""
606
def test_multi_line_merge(self):
632
608
"""A Book of Verses underneath the Bough,
633
609
A Jug of Wine, a Loaf of Bread, -- and Thou
634
610
Beside me singing in the Wilderness --
635
611
Oh, Wilderness were Paradise enow!""",
637
613
"""A Book of Verses underneath the Bough,
638
614
A Jug of Wine, a Loaf of Bread, -- and Thou
639
615
Beside me singing in the Wilderness --
662
ver = k.add('text%d' % i,
638
ver = k.add_lines('text%d' % i,
663
639
list(parents), t)
640
parents.add('text%d' % i)
667
643
self.log("k._weave=" + pformat(k._weave))
669
645
for i, t in enumerate(texts):
670
self.assertEqual(k.get(i), t)
646
self.assertEqual(k.get_lines(i), t)
672
648
self.check_read_write(k)
651
class JoinWeavesTests(TestBase):
653
super(JoinWeavesTests, self).setUp()
654
self.weave1 = Weave()
655
self.lines1 = ['hello\n']
656
self.lines3 = ['hello\n', 'cruel\n', 'world\n']
657
self.weave1.add_lines('v1', [], self.lines1)
658
self.weave1.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
659
self.weave1.add_lines('v3', ['v2'], self.lines3)
676
class MergeCases(TestBase):
677
def doMerge(self, base, a, b, mp):
661
def test_written_detection(self):
662
# Test detection of weave file corruption.
664
# Make sure that we can detect if a weave file has
665
# been corrupted. This doesn't test all forms of corruption,
666
# but it at least helps verify the data you get, is what you want.
678
667
from cStringIO import StringIO
679
from textwrap import dedent
685
w.add('text0', [], map(addcrlf, base))
686
w.add('text1', [0], map(addcrlf, a))
687
w.add('text2', [0], map(addcrlf, b))
670
w.add_lines('v1', [], ['hello\n'])
671
w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
689
self.log('weave is:')
690
673
tmpf = StringIO()
691
674
write_weave(w, tmpf)
692
self.log(tmpf.getvalue())
694
self.log('merge plan:')
695
p = list(w.plan_merge(1, 2))
696
for state, line in p:
698
self.log('%12s | %s' % (state, line[:-1]))
702
mt.writelines(w.weave_merge(p))
704
self.log(mt.getvalue())
706
mp = map(addcrlf, mp)
707
self.assertEqual(mt.readlines(), mp)
710
def testOneInsert(self):
716
def testSeparateInserts(self):
717
self.doMerge(['aaa', 'bbb', 'ccc'],
718
['aaa', 'xxx', 'bbb', 'ccc'],
719
['aaa', 'bbb', 'yyy', 'ccc'],
720
['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
722
def testSameInsert(self):
723
self.doMerge(['aaa', 'bbb', 'ccc'],
724
['aaa', 'xxx', 'bbb', 'ccc'],
725
['aaa', 'xxx', 'bbb', 'yyy', 'ccc'],
726
['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
728
def testOverlappedInsert(self):
729
self.doMerge(['aaa', 'bbb'],
730
['aaa', 'xxx', 'yyy', 'bbb'],
731
['aaa', 'xxx', 'bbb'],
732
['aaa', '<<<<', 'xxx', 'yyy', '====', 'xxx', '>>>>', 'bbb'])
734
# really it ought to reduce this to
735
# ['aaa', 'xxx', 'yyy', 'bbb']
738
def testClashReplace(self):
739
self.doMerge(['aaa'],
742
['<<<<', 'xxx', '====', 'yyy', 'zzz', '>>>>'])
744
def testNonClashInsert(self):
745
self.doMerge(['aaa'],
748
['<<<<', 'xxx', 'aaa', '====', 'yyy', 'zzz', '>>>>'])
750
self.doMerge(['aaa'],
756
def testDeleteAndModify(self):
757
"""Clashing delete and modification.
759
If one side modifies a region and the other deletes it then
760
there should be a conflict with one side blank.
763
#######################################
764
# skippd, not working yet
767
self.doMerge(['aaa', 'bbb', 'ccc'],
768
['aaa', 'ddd', 'ccc'],
770
['<<<<', 'aaa', '====', '>>>>', 'ccc'])
776
from unittest import TestSuite, TestLoader
781
suite.addTest(tl.loadTestsFromModule(testweave))
783
return int(not testsweet.run_suite(suite)) # for shell 0=true
786
if __name__ == '__main__':
788
sys.exit(testweave())
676
# Because we are corrupting, we need to make sure we have the exact text
677
self.assertEquals('# bzr weave file v5\n'
678
'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
679
'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
680
'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
683
# Change a single letter
684
tmpf = StringIO('# bzr weave file v5\n'
685
'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
686
'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
687
'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
691
self.assertEqual('hello\n', w.get_text('v1'))
692
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
693
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
694
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
696
# Change the sha checksum
697
tmpf = StringIO('# bzr weave file v5\n'
698
'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
699
'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
700
'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
704
self.assertEqual('hello\n', w.get_text('v1'))
705
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
706
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
707
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
710
class TestWeave(TestCase):
712
def test_allow_reserved_false(self):
713
w = Weave('name', allow_reserved=False)
714
# Add lines is checked at the WeaveFile level, not at the Weave level
715
w.add_lines('name:', [], TEXT_1)
716
# But get_lines is checked at this level
717
self.assertRaises(errors.ReservedId, w.get_lines, 'name:')
719
def test_allow_reserved_true(self):
720
w = Weave('name', allow_reserved=True)
721
w.add_lines('name:', [], TEXT_1)
722
self.assertEqual(TEXT_1, w.get_lines('name:'))
725
class InstrumentedWeave(Weave):
726
"""Keep track of how many times functions are called."""
728
def __init__(self, weave_name=None):
729
self._extract_count = 0
730
Weave.__init__(self, weave_name=weave_name)
732
def _extract(self, versions):
733
self._extract_count += 1
734
return Weave._extract(self, versions)
737
class TestNeedsReweave(TestCase):
738
"""Internal corner cases for when reweave is needed."""
740
def test_compatible_parents(self):
742
my_parents = set([1, 2, 3])
744
self.assertTrue(w1._compatible_parents(my_parents, set([3])))
746
self.assertTrue(w1._compatible_parents(my_parents, set(my_parents)))
747
# same empty corner case
748
self.assertTrue(w1._compatible_parents(set(), set()))
749
# other cannot contain stuff my_parents does not
750
self.assertFalse(w1._compatible_parents(set(), set([1])))
751
self.assertFalse(w1._compatible_parents(my_parents, set([1, 2, 3, 4])))
752
self.assertFalse(w1._compatible_parents(my_parents, set([4])))
755
class TestWeaveFile(TestCaseInTempDir):
757
def test_empty_file(self):
758
f = open('empty.weave', 'wb+')
760
self.assertRaises(errors.WeaveFormatError,