59
59
self.log(' %r' % k._parents)
60
60
self.log(' %r' % k2._parents)
64
62
self.fail('read/write check failed')
65
class WeaveContains(TestBase):
66
"""Weave __contains__ operator"""
69
k = Weave(get_scope=lambda:None)
70
self.assertFalse('foo' in k)
71
k.add_lines('foo', [], TEXT_1)
72
self.assertTrue('foo' in k)
69
75
class Easy(TestBase):
74
class StoreText(TestBase):
75
"""Store and retrieve a simple text."""
78
idx = k.add('text0', [], TEXT_0)
79
self.assertEqual(k.get(idx), TEXT_0)
80
self.assertEqual(idx, 0)
84
81
class AnnotateOne(TestBase):
87
k.add('text0', [], TEXT_0)
88
self.assertEqual(k.annotate(0),
92
class StoreTwo(TestBase):
96
idx = k.add('text0', [], TEXT_0)
97
self.assertEqual(idx, 0)
99
idx = k.add('text1', [], TEXT_1)
100
self.assertEqual(idx, 1)
102
self.assertEqual(k.get(0), TEXT_0)
103
self.assertEqual(k.get(1), TEXT_1)
107
class AddWithGivenSha(TestBase):
109
"""Add with caller-supplied SHA-1"""
113
k.add('text0', [], [t], sha1=sha_string(t))
85
k.add_lines('text0', [], TEXT_0)
86
self.assertEqual(k.annotate('text0'),
87
[('text0', TEXT_0[0])])
117
90
class InvalidAdd(TestBase):
118
91
"""Try to use invalid version number during add."""
119
93
def runTest(self):
122
self.assertRaises(IndexError,
96
self.assertRaises(errors.RevisionNotPresent,
129
103
class RepeatedAdd(TestBase):
130
104
"""Add the same version twice; harmless."""
106
def test_duplicate_add(self):
133
idx = k.add('text0', [], TEXT_0)
134
idx2 = k.add('text0', [], TEXT_0)
108
idx = k.add_lines('text0', [], TEXT_0)
109
idx2 = k.add_lines('text0', [], TEXT_0)
135
110
self.assertEqual(idx, idx2)
139
113
class InvalidRepeatedAdd(TestBase):
140
115
def runTest(self):
142
idx = k.add('text0', [], TEXT_0)
143
self.assertRaises(WeaveError,
117
k.add_lines('basis', [], TEXT_0)
118
idx = k.add_lines('text0', [], TEXT_0)
119
self.assertRaises(errors.RevisionAlreadyPresent,
147
123
['not the same text'])
148
self.assertRaises(WeaveError,
124
self.assertRaises(errors.RevisionAlreadyPresent,
151
[12], # not the right parents
127
['basis'], # not the right parents
156
131
class InsertLines(TestBase):
686
ver = k.add('text%d' % i,
644
ver = k.add_lines('text%d' % i,
687
645
list(parents), t)
646
parents.add('text%d' % i)
691
649
self.log("k._weave=" + pformat(k._weave))
693
651
for i, t in enumerate(texts):
694
self.assertEqual(k.get(i), t)
652
self.assertEqual(k.get_lines(i), t)
696
654
self.check_read_write(k)
700
class MergeCases(TestBase):
701
def doMerge(self, base, a, b, mp):
702
from cStringIO import StringIO
703
from textwrap import dedent
709
w.add('text0', [], map(addcrlf, base))
710
w.add('text1', [0], map(addcrlf, a))
711
w.add('text2', [0], map(addcrlf, b))
713
self.log('weave is:')
716
self.log(tmpf.getvalue())
718
self.log('merge plan:')
719
p = list(w.plan_merge(1, 2))
720
for state, line in p:
722
self.log('%12s | %s' % (state, line[:-1]))
726
mt.writelines(w.weave_merge(p))
728
self.log(mt.getvalue())
730
mp = map(addcrlf, mp)
731
self.assertEqual(mt.readlines(), mp)
734
def testOneInsert(self):
740
def testSeparateInserts(self):
741
self.doMerge(['aaa', 'bbb', 'ccc'],
742
['aaa', 'xxx', 'bbb', 'ccc'],
743
['aaa', 'bbb', 'yyy', 'ccc'],
744
['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
746
def testSameInsert(self):
747
self.doMerge(['aaa', 'bbb', 'ccc'],
748
['aaa', 'xxx', 'bbb', 'ccc'],
749
['aaa', 'xxx', 'bbb', 'yyy', 'ccc'],
750
['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
752
def testOverlappedInsert(self):
753
self.doMerge(['aaa', 'bbb'],
754
['aaa', 'xxx', 'yyy', 'bbb'],
755
['aaa', 'xxx', 'bbb'],
756
['aaa', '<<<<', 'xxx', 'yyy', '====', 'xxx', '>>>>', 'bbb'])
758
# really it ought to reduce this to
759
# ['aaa', 'xxx', 'yyy', 'bbb']
762
def testClashReplace(self):
763
self.doMerge(['aaa'],
766
['<<<<', 'xxx', '====', 'yyy', 'zzz', '>>>>'])
768
def testNonClashInsert(self):
769
self.doMerge(['aaa'],
772
['<<<<', 'xxx', 'aaa', '====', 'yyy', 'zzz', '>>>>'])
774
self.doMerge(['aaa'],
780
def testDeleteAndModify(self):
781
"""Clashing delete and modification.
783
If one side modifies a region and the other deletes it then
784
there should be a conflict with one side blank.
787
#######################################
788
# skippd, not working yet
791
self.doMerge(['aaa', 'bbb', 'ccc'],
792
['aaa', 'ddd', 'ccc'],
794
['<<<<', 'aaa', '====', '>>>>', 'ccc'])
797
657
class JoinWeavesTests(TestBase):
799
660
super(JoinWeavesTests, self).setUp()
800
661
self.weave1 = Weave()
801
662
self.lines1 = ['hello\n']
802
663
self.lines3 = ['hello\n', 'cruel\n', 'world\n']
803
self.weave1.add('v1', [], self.lines1)
804
self.weave1.add('v2', [0], ['hello\n', 'world\n'])
805
self.weave1.add('v3', [1], self.lines3)
807
def test_join_empty(self):
808
"""Join two empty weaves."""
809
eq = self.assertEqual
813
eq(w1.numversions(), 0)
815
def test_join_empty_to_nonempty(self):
816
"""Join empty weave onto nonempty."""
817
self.weave1.join(Weave())
818
self.assertEqual(len(self.weave1), 3)
820
def test_join_unrelated(self):
821
"""Join two weaves with no history in common."""
823
wb.add('b1', [], ['line from b\n'])
826
eq = self.assertEqual
828
eq(sorted(list(w1.iter_names())),
829
['b1', 'v1', 'v2', 'v3'])
831
def test_join_related(self):
832
wa = self.weave1.copy()
833
wb = self.weave1.copy()
834
wa.add('a1', ['v3'], ['hello\n', 'sweet\n', 'world\n'])
835
wb.add('b1', ['v3'], ['hello\n', 'pale blue\n', 'world\n'])
836
eq = self.assertEquals
841
eq(wa.get_lines('b1'),
842
['hello\n', 'pale blue\n', 'world\n'])
844
def test_join_parent_disagreement(self):
845
"""Cannot join weaves with different parents for a version."""
848
wa.add('v1', [], ['hello\n'])
850
wb.add('v1', ['v0'], ['hello\n'])
851
self.assertRaises(WeaveError,
854
def test_join_text_disagreement(self):
855
"""Cannot join weaves with different texts for a version."""
858
wa.add('v1', [], ['hello\n'])
859
wb.add('v1', [], ['not\n', 'hello\n'])
860
self.assertRaises(WeaveError,
863
def test_join_unordered(self):
864
"""Join weaves where indexes differ.
866
The source weave contains a different version at index 0."""
867
wa = self.weave1.copy()
869
wb.add('x1', [], ['line from x1\n'])
870
wb.add('v1', [], ['hello\n'])
871
wb.add('v2', ['v1'], ['hello\n', 'world\n'])
873
eq = self.assertEquals
874
eq(sorted(wa.iter_names()), ['v1', 'v2', 'v3', 'x1',])
875
eq(wa.get_text('x1'), 'line from x1\n')
877
def test_reweave_with_empty(self):
879
wr = reweave(self.weave1, wb)
880
eq = self.assertEquals
881
eq(sorted(wr.iter_names()), ['v1', 'v2', 'v3'])
882
eq(wr.get_lines('v3'), ['hello\n', 'cruel\n', 'world\n'])
883
self.weave1.reweave(wb)
884
self.assertEquals(wr, self.weave1)
886
def test_join_with_ghosts_raises_parent_mismatch(self):
887
wa = self.weave1.copy()
889
wb.add('x1', [], ['line from x1\n'])
890
wb.add('v1', [], ['hello\n'])
891
wb.add('v2', ['v1', 'x1'], ['hello\n', 'world\n'])
892
self.assertRaises(errors.WeaveParentMismatch, wa.join, wb)
894
def test_reweave_with_ghosts(self):
895
"""Join that inserts parents of an existing revision.
897
This can happen when merging from another branch who
898
knows about revisions the destination does not. In
899
this test the second weave knows of an additional parent of
900
v2. Any revisions which are in common still have to have the
902
wa = self.weave1.copy()
904
wb.add('x1', [], ['line from x1\n'])
905
wb.add('v1', [], ['hello\n'])
906
wb.add('v2', ['v1', 'x1'], ['hello\n', 'world\n'])
908
eq = self.assertEquals
909
eq(sorted(wc.iter_names()), ['v1', 'v2', 'v3', 'x1',])
910
eq(wc.get_text('x1'), 'line from x1\n')
911
eq(wc.get_lines('v2'), ['hello\n', 'world\n'])
912
eq(wc.parent_names('v2'), ['v1', 'x1'])
913
self.weave1.reweave(wb)
914
self.assertEquals(wc, self.weave1)
917
if __name__ == '__main__':
920
sys.exit(unittest.main())
664
self.weave1.add_lines('v1', [], self.lines1)
665
self.weave1.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
666
self.weave1.add_lines('v3', ['v2'], self.lines3)
668
def test_written_detection(self):
669
# Test detection of weave file corruption.
671
# Make sure that we can detect if a weave file has
672
# been corrupted. This doesn't test all forms of corruption,
673
# but it at least helps verify the data you get, is what you want.
674
from cStringIO import StringIO
677
w.add_lines('v1', [], ['hello\n'])
678
w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
683
# Because we are corrupting, we need to make sure we have the exact text
684
self.assertEqual('# bzr weave file v5\n'
685
'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
686
'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
687
'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
690
# Change a single letter
691
tmpf = StringIO('# bzr weave file v5\n'
692
'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
693
'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
694
'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
698
self.assertEqual('hello\n', w.get_text('v1'))
699
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
700
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
701
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
703
# Change the sha checksum
704
tmpf = StringIO('# bzr weave file v5\n'
705
'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
706
'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
707
'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
711
self.assertEqual('hello\n', w.get_text('v1'))
712
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
713
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
714
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
717
class TestWeave(TestCase):
719
def test_allow_reserved_false(self):
720
w = Weave('name', allow_reserved=False)
721
# Add lines is checked at the WeaveFile level, not at the Weave level
722
w.add_lines('name:', [], TEXT_1)
723
# But get_lines is checked at this level
724
self.assertRaises(errors.ReservedId, w.get_lines, 'name:')
726
def test_allow_reserved_true(self):
727
w = Weave('name', allow_reserved=True)
728
w.add_lines('name:', [], TEXT_1)
729
self.assertEqual(TEXT_1, w.get_lines('name:'))
732
class InstrumentedWeave(Weave):
733
"""Keep track of how many times functions are called."""
735
def __init__(self, weave_name=None):
736
self._extract_count = 0
737
Weave.__init__(self, weave_name=weave_name)
739
def _extract(self, versions):
740
self._extract_count += 1
741
return Weave._extract(self, versions)
744
class TestNeedsReweave(TestCase):
745
"""Internal corner cases for when reweave is needed."""
747
def test_compatible_parents(self):
749
my_parents = set([1, 2, 3])
751
self.assertTrue(w1._compatible_parents(my_parents, set([3])))
753
self.assertTrue(w1._compatible_parents(my_parents, set(my_parents)))
754
# same empty corner case
755
self.assertTrue(w1._compatible_parents(set(), set()))
756
# other cannot contain stuff my_parents does not
757
self.assertFalse(w1._compatible_parents(set(), set([1])))
758
self.assertFalse(w1._compatible_parents(my_parents, set([1, 2, 3, 4])))
759
self.assertFalse(w1._compatible_parents(my_parents, set([4])))
762
class TestWeaveFile(TestCaseInTempDir):
764
def test_empty_file(self):
765
f = open('empty.weave', 'wb+')
767
self.assertRaises(errors.WeaveFormatError,