43
class TestBase(TestCase):
44
def check_read_write(self, k):
45
"""Check the weave k can be written & re-read."""
46
from tempfile import TemporaryFile
55
self.log('serialized weave:')
59
self.log('parents: %s' % (k._parents == k2._parents))
60
self.log(' %r' % k._parents)
61
self.log(' %r' % k2._parents)
63
self.fail('read/write check failed')
66
class WeaveContains(TestBase):
67
"""Weave __contains__ operator"""
70
self.assertFalse('foo' in k)
71
k.add_lines('foo', [], TEXT_1)
72
self.assertTrue('foo' in k)
33
75
class Easy(TestBase):
38
80
class StoreText(TestBase):
39
81
"""Store and retrieve a simple text."""
42
idx = k.add([], TEXT_0)
43
self.assertEqual(k.get(idx), TEXT_0)
83
def test_storing_text(self):
85
idx = k.add_lines('text0', [], TEXT_0)
86
self.assertEqual(k.get_lines(idx), TEXT_0)
44
87
self.assertEqual(idx, 0)
48
90
class AnnotateOne(TestBase):
52
self.assertEqual(k.annotate(0),
93
k.add_lines('text0', [], TEXT_0)
94
self.assertEqual(k.annotate('text0'),
95
[('text0', TEXT_0[0])])
56
98
class StoreTwo(TestBase):
60
idx = k.add([], TEXT_0)
102
idx = k.add_lines('text0', [], TEXT_0)
61
103
self.assertEqual(idx, 0)
63
idx = k.add([], TEXT_1)
105
idx = k.add_lines('text1', [], TEXT_1)
64
106
self.assertEqual(idx, 1)
66
self.assertEqual(k.get(0), TEXT_0)
67
self.assertEqual(k.get(1), TEXT_1)
73
class Delta1(TestBase):
74
"""Detection of changes prior to inserting new revision."""
76
from pprint import pformat
81
changes = list(k._delta(set([0]),
85
self.log('raw changes: ' + pformat(changes))
87
# should be one inserted line after line 0q
88
self.assertEquals(changes,
89
[(1, 1, ['new line'])])
91
changes = k._delta(set([0]),
95
self.assertEquals(list(changes),
96
[(0, 0, ['top line'])])
108
self.assertEqual(k.get_lines(0), TEXT_0)
109
self.assertEqual(k.get_lines(1), TEXT_1)
112
class GetSha1(TestBase):
113
def test_get_sha1(self):
115
k.add_lines('text0', [], 'text0')
116
self.assertEqual('34dc0e430c642a26c3dd1c2beb7a8b4f4445eb79',
118
self.assertRaises(errors.RevisionNotPresent,
120
self.assertRaises(errors.RevisionNotPresent,
100
124
class InvalidAdd(TestBase):
101
125
"""Try to use invalid version number during add."""
102
126
def runTest(self):
105
self.assertRaises(IndexError,
129
self.assertRaises(errors.RevisionNotPresent,
136
class RepeatedAdd(TestBase):
137
"""Add the same version twice; harmless."""
140
idx = k.add_lines('text0', [], TEXT_0)
141
idx2 = k.add_lines('text0', [], TEXT_0)
142
self.assertEqual(idx, idx2)
145
class InvalidRepeatedAdd(TestBase):
148
k.add_lines('basis', [], TEXT_0)
149
idx = k.add_lines('text0', [], TEXT_0)
150
self.assertRaises(errors.RevisionAlreadyPresent,
154
['not the same text'])
155
self.assertRaises(errors.RevisionAlreadyPresent,
158
['basis'], # not the right parents
111
162
class InsertLines(TestBase):
112
163
"""Store a revision that adds one line to the original.
114
165
Look at the annotations to make sure that the first line is matched
115
166
and not stored repeatedly."""
116
167
def runTest(self):
119
k.add([], ['line 1'])
120
k.add([0], ['line 1', 'line 2'])
122
self.assertEqual(k.annotate(0),
125
self.assertEqual(k.get(1),
170
k.add_lines('text0', [], ['line 1'])
171
k.add_lines('text1', ['text0'], ['line 1', 'line 2'])
173
self.assertEqual(k.annotate('text0'),
174
[('text0', 'line 1')])
176
self.assertEqual(k.get_lines(1),
129
self.assertEqual(k.annotate(1),
133
k.add([0], ['line 1', 'diverged line'])
135
self.assertEqual(k.annotate(2),
137
(2, 'diverged line')])
140
['line 1', 'middle line', 'line 2'])
142
self.assertEqual(k.annotate(3),
180
self.assertEqual(k.annotate('text1'),
181
[('text0', 'line 1'),
182
('text1', 'line 2')])
184
k.add_lines('text2', ['text0'], ['line 1', 'diverged line'])
186
self.assertEqual(k.annotate('text2'),
187
[('text0', 'line 1'),
188
('text2', 'diverged line')])
190
text3 = ['line 1', 'middle line', 'line 2']
195
# self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]), text3))))
197
self.log("k._weave=" + pformat(k._weave))
199
self.assertEqual(k.annotate('text3'),
200
[('text0', 'line 1'),
201
('text3', 'middle line'),
202
('text1', 'line 2')])
204
# now multiple insertions at different places
206
['text0', 'text1', 'text3'],
207
['line 1', 'aaa', 'middle line', 'bbb', 'line 2', 'ccc'])
209
self.assertEqual(k.annotate('text4'),
210
[('text0', 'line 1'),
212
('text3', 'middle line'),
218
class DeleteLines(TestBase):
219
"""Deletion of lines from existing text.
221
Try various texts all based on a common ancestor."""
225
base_text = ['one', 'two', 'three', 'four']
227
k.add_lines('text0', [], base_text)
229
texts = [['one', 'two', 'three'],
230
['two', 'three', 'four'],
232
['one', 'two', 'three', 'four'],
237
ver = k.add_lines('text%d' % i,
241
self.log('final weave:')
242
self.log('k._weave=' + pformat(k._weave))
244
for i in range(len(texts)):
245
self.assertEqual(k.get_lines(i+1),
249
class SuicideDelete(TestBase):
250
"""Invalid weave which tries to add and delete simultaneously."""
256
k._weave = [('{', 0),
263
################################### SKIPPED
264
# Weave.get doesn't trap this anymore
267
self.assertRaises(WeaveFormatError,
272
class CannedDelete(TestBase):
273
"""Unpack canned weave with deleted lines."""
280
k._weave = [('{', 0),
283
'line to be deleted',
288
k._sha1s = [sha_string('first lineline to be deletedlast line')
289
, sha_string('first linelast line')]
291
self.assertEqual(k.get_lines(0),
293
'line to be deleted',
297
self.assertEqual(k.get_lines(1),
303
class CannedReplacement(TestBase):
304
"""Unpack canned weave with deleted lines."""
308
k._parents = [frozenset(),
311
k._weave = [('{', 0),
314
'line to be deleted',
322
k._sha1s = [sha_string('first lineline to be deletedlast line')
323
, sha_string('first linereplacement linelast line')]
325
self.assertEqual(k.get_lines(0),
327
'line to be deleted',
331
self.assertEqual(k.get_lines(1),
338
class BadWeave(TestBase):
339
"""Test that we trap an insert which should not occur."""
343
k._parents = [frozenset(),
345
k._weave = ['bad line',
349
' added in version 1',
358
################################### SKIPPED
359
# Weave.get doesn't trap this anymore
363
self.assertRaises(WeaveFormatError,
368
class BadInsert(TestBase):
369
"""Test that we trap an insert which should not occur."""
373
k._parents = [frozenset(),
378
k._weave = [('{', 0),
381
' added in version 1',
389
# this is not currently enforced by get
390
return ##########################################
392
self.assertRaises(WeaveFormatError,
396
self.assertRaises(WeaveFormatError,
401
class InsertNested(TestBase):
402
"""Insertion with nested instructions."""
406
k._parents = [frozenset(),
411
k._weave = [('{', 0),
414
' added in version 1',
423
k._sha1s = [sha_string('foo {}')
424
, sha_string('foo { added in version 1 also from v1}')
425
, sha_string('foo { added in v2}')
426
, sha_string('foo { added in version 1 added in v2 also from v1}')
429
self.assertEqual(k.get_lines(0),
433
self.assertEqual(k.get_lines(1),
435
' added in version 1',
439
self.assertEqual(k.get_lines(2),
444
self.assertEqual(k.get_lines(3),
446
' added in version 1',
452
class DeleteLines2(TestBase):
453
"""Test recording revisions that delete lines.
455
This relies on the weave having a way to represent lines knocked
456
out by a later revision."""
460
k.add_lines('text0', [], ["line the first",
465
self.assertEqual(len(k.get_lines(0)), 4)
467
k.add_lines('text1', ['text0'], ["line the first",
470
self.assertEqual(k.get_lines(1),
474
self.assertEqual(k.annotate('text1'),
475
[('text0', "line the first"),
149
479
class IncludeVersions(TestBase):
150
480
"""Check texts that are stored across multiple revisions.
152
Here we manually create a knit with particular encoding and make
482
Here we manually create a weave with particular encoding and make
153
483
sure it unpacks properly.
155
485
Text 0 includes nothing; text 1 includes text 0 and adds some
159
489
def runTest(self):
162
k._v = [VerInfo(), VerInfo(included=[0])]
163
k._l = [(0, "first line"),
166
self.assertEqual(k.get(1),
492
k._parents = [frozenset(), frozenset([0])]
493
k._weave = [('{', 0),
500
k._sha1s = [sha_string('first line')
501
, sha_string('first linesecond line')]
503
self.assertEqual(k.get_lines(1),
170
self.assertEqual(k.get(0),
507
self.assertEqual(k.get_lines(0),
173
k.dump(self.TEST_LOG)
176
511
class DivergedIncludes(TestBase):
177
"""Knit with two diverged texts based on version 0.
512
"""Weave with two diverged texts based on version 0.
179
514
def runTest(self):
183
VerInfo(included=[0]),
184
VerInfo(included=[0]),
186
k._l = [(0, "first line"),
188
(2, "alternative second line"),]
190
self.assertEqual(k.get(0),
515
# FIXME make the weave, dont poke at it.
518
k._names = ['0', '1', '2']
519
k._name_map = {'0':0, '1':1, '2':2}
520
k._parents = [frozenset(),
524
k._weave = [('{', 0),
531
"alternative second line",
535
k._sha1s = [sha_string('first line')
536
, sha_string('first linesecond line')
537
, sha_string('first linealternative second line')]
539
self.assertEqual(k.get_lines(0),
193
self.assertEqual(k.get(1),
542
self.assertEqual(k.get_lines(1),
197
self.assertEqual(k.get(2),
546
self.assertEqual(k.get_lines('2'),
199
548
"alternative second line"])
203
from unittest import TestSuite, TestLoader
208
suite.addTest(tl.loadTestsFromModule(testknit))
210
return int(not testsweet.run_suite(suite)) # for shell 0=true
213
if __name__ == '__main__':
550
self.assertEqual(list(k.get_ancestry(['2'])),
554
class ReplaceLine(TestBase):
558
text0 = ['cheddar', 'stilton', 'gruyere']
559
text1 = ['cheddar', 'blue vein', 'neufchatel', 'chevre']
561
k.add_lines('text0', [], text0)
562
k.add_lines('text1', ['text0'], text1)
564
self.log('k._weave=' + pformat(k._weave))
566
self.assertEqual(k.get_lines(0), text0)
567
self.assertEqual(k.get_lines(1), text1)
570
class Merge(TestBase):
571
"""Storage of versions that merge diverged parents"""
576
['header', '', 'line from 1'],
577
['header', '', 'line from 2', 'more from 2'],
578
['header', '', 'line from 1', 'fixup line', 'line from 2'],
581
k.add_lines('text0', [], texts[0])
582
k.add_lines('text1', ['text0'], texts[1])
583
k.add_lines('text2', ['text0'], texts[2])
584
k.add_lines('merge', ['text0', 'text1', 'text2'], texts[3])
586
for i, t in enumerate(texts):
587
self.assertEqual(k.get_lines(i), t)
589
self.assertEqual(k.annotate('merge'),
590
[('text0', 'header'),
592
('text1', 'line from 1'),
593
('merge', 'fixup line'),
594
('text2', 'line from 2'),
597
self.assertEqual(list(k.get_ancestry(['merge'])),
598
['text0', 'text1', 'text2', 'merge'])
600
self.log('k._weave=' + pformat(k._weave))
602
self.check_read_write(k)
605
class Conflicts(TestBase):
606
"""Test detection of conflicting regions during a merge.
608
A base version is inserted, then two descendents try to
609
insert different lines in the same place. These should be
610
reported as a possible conflict and forwarded to the user."""
615
k.add_lines([], ['aaa', 'bbb'])
616
k.add_lines([0], ['aaa', '111', 'bbb'])
617
k.add_lines([1], ['aaa', '222', 'bbb'])
619
merged = k.merge([1, 2])
621
self.assertEquals([[['aaa']],
626
class NonConflict(TestBase):
627
"""Two descendants insert compatible changes.
629
No conflict should be reported."""
634
k.add_lines([], ['aaa', 'bbb'])
635
k.add_lines([0], ['111', 'aaa', 'ccc', 'bbb'])
636
k.add_lines([1], ['aaa', 'ccc', 'bbb', '222'])
639
class Khayyam(TestBase):
640
"""Test changes to multi-line texts, and read/write"""
642
def test_multi_line_merge(self):
644
"""A Book of Verses underneath the Bough,
645
A Jug of Wine, a Loaf of Bread, -- and Thou
646
Beside me singing in the Wilderness --
647
Oh, Wilderness were Paradise enow!""",
649
"""A Book of Verses underneath the Bough,
650
A Jug of Wine, a Loaf of Bread, -- and Thou
651
Beside me singing in the Wilderness --
652
Oh, Wilderness were Paradise now!""",
654
"""A Book of poems underneath the tree,
655
A Jug of Wine, a Loaf of Bread,
657
Beside me singing in the Wilderness --
658
Oh, Wilderness were Paradise now!
662
"""A Book of Verses underneath the Bough,
663
A Jug of Wine, a Loaf of Bread,
665
Beside me singing in the Wilderness --
666
Oh, Wilderness were Paradise now!""",
668
texts = [[l.strip() for l in t.split('\n')] for t in rawtexts]
674
ver = k.add_lines('text%d' % i,
676
parents.add('text%d' % i)
679
self.log("k._weave=" + pformat(k._weave))
681
for i, t in enumerate(texts):
682
self.assertEqual(k.get_lines(i), t)
684
self.check_read_write(k)
687
class JoinWeavesTests(TestBase):
689
super(JoinWeavesTests, self).setUp()
690
self.weave1 = Weave()
691
self.lines1 = ['hello\n']
692
self.lines3 = ['hello\n', 'cruel\n', 'world\n']
693
self.weave1.add_lines('v1', [], self.lines1)
694
self.weave1.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
695
self.weave1.add_lines('v3', ['v2'], self.lines3)
697
def test_join_empty(self):
698
"""Join two empty weaves."""
699
eq = self.assertEqual
705
def test_join_empty_to_nonempty(self):
706
"""Join empty weave onto nonempty."""
707
self.weave1.join(Weave())
708
self.assertEqual(len(self.weave1), 3)
710
def test_join_unrelated(self):
711
"""Join two weaves with no history in common."""
713
wb.add_lines('b1', [], ['line from b\n'])
716
eq = self.assertEqual
718
eq(sorted(w1.versions()),
719
['b1', 'v1', 'v2', 'v3'])
721
def test_join_related(self):
722
wa = self.weave1.copy()
723
wb = self.weave1.copy()
724
wa.add_lines('a1', ['v3'], ['hello\n', 'sweet\n', 'world\n'])
725
wb.add_lines('b1', ['v3'], ['hello\n', 'pale blue\n', 'world\n'])
726
eq = self.assertEquals
731
eq(wa.get_lines('b1'),
732
['hello\n', 'pale blue\n', 'world\n'])
734
def test_join_parent_disagreement(self):
735
#join reconciles differening parents into a union.
738
wa.add_lines('v1', [], ['hello\n'])
739
wb.add_lines('v0', [], [])
740
wb.add_lines('v1', ['v0'], ['hello\n'])
742
self.assertEqual(['v0'], wa.get_parents('v1'))
744
def test_join_text_disagreement(self):
745
"""Cannot join weaves with different texts for a version."""
748
wa.add_lines('v1', [], ['hello\n'])
749
wb.add_lines('v1', [], ['not\n', 'hello\n'])
750
self.assertRaises(WeaveError,
753
def test_join_unordered(self):
754
"""Join weaves where indexes differ.
756
The source weave contains a different version at index 0."""
757
wa = self.weave1.copy()
759
wb.add_lines('x1', [], ['line from x1\n'])
760
wb.add_lines('v1', [], ['hello\n'])
761
wb.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
763
eq = self.assertEquals
764
eq(sorted(wa.versions()), ['v1', 'v2', 'v3', 'x1',])
765
eq(wa.get_text('x1'), 'line from x1\n')
767
def test_written_detection(self):
768
# Test detection of weave file corruption.
770
# Make sure that we can detect if a weave file has
771
# been corrupted. This doesn't test all forms of corruption,
772
# but it at least helps verify the data you get, is what you want.
773
from cStringIO import StringIO
776
w.add_lines('v1', [], ['hello\n'])
777
w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
782
# Because we are corrupting, we need to make sure we have the exact text
783
self.assertEquals('# bzr weave file v5\n'
784
'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
785
'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
786
'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
789
# Change a single letter
790
tmpf = StringIO('# bzr weave file v5\n'
791
'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
792
'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
793
'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
797
self.assertEqual('hello\n', w.get_text('v1'))
798
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
799
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
800
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
802
# Change the sha checksum
803
tmpf = StringIO('# bzr weave file v5\n'
804
'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
805
'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
806
'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
810
self.assertEqual('hello\n', w.get_text('v1'))
811
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
812
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
813
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
816
class InstrumentedWeave(Weave):
817
"""Keep track of how many times functions are called."""
819
def __init__(self, weave_name=None):
820
self._extract_count = 0
821
Weave.__init__(self, weave_name=weave_name)
823
def _extract(self, versions):
824
self._extract_count += 1
825
return Weave._extract(self, versions)
828
class JoinOptimization(TestCase):
829
"""Test that Weave.join() doesn't extract all texts, only what must be done."""
832
w1 = InstrumentedWeave()
833
w2 = InstrumentedWeave()
836
txt1 = ['a\n', 'b\n']
837
txt2 = ['a\n', 'c\n']
838
txt3 = ['a\n', 'b\n', 'c\n']
840
w1.add_lines('txt0', [], txt0) # extract 1a
841
w2.add_lines('txt0', [], txt0) # extract 1b
842
w1.add_lines('txt1', ['txt0'], txt1)# extract 2a
843
w2.add_lines('txt2', ['txt0'], txt2)# extract 2b
844
w1.join(w2) # extract 3a to add txt2
845
w2.join(w1) # extract 3b to add txt1
847
w1.add_lines('txt3', ['txt1', 'txt2'], txt3) # extract 4a
848
w2.add_lines('txt3', ['txt2', 'txt1'], txt3) # extract 4b
849
# These secretly have inverted parents
851
# This should not have to do any extractions
852
w1.join(w2) # NO extract, texts already present with same parents
853
w2.join(w1) # NO extract, texts already present with same parents
855
self.assertEqual(4, w1._extract_count)
856
self.assertEqual(4, w2._extract_count)
858
def test_double_parent(self):
859
# It should not be considered illegal to add
860
# a revision with the same parent twice
861
w1 = InstrumentedWeave()
862
w2 = InstrumentedWeave()
865
txt1 = ['a\n', 'b\n']
866
txt2 = ['a\n', 'c\n']
867
txt3 = ['a\n', 'b\n', 'c\n']
869
w1.add_lines('txt0', [], txt0)
870
w2.add_lines('txt0', [], txt0)
871
w1.add_lines('txt1', ['txt0'], txt1)
872
w2.add_lines('txt1', ['txt0', 'txt0'], txt1)
873
# Same text, effectively the same, because the
874
# parent is only repeated
875
w1.join(w2) # extract 3a to add txt2
876
w2.join(w1) # extract 3b to add txt1
879
class TestNeedsReweave(TestCase):
880
"""Internal corner cases for when reweave is needed."""
882
def test_compatible_parents(self):
884
my_parents = set([1, 2, 3])
886
self.assertTrue(w1._compatible_parents(my_parents, set([3])))
888
self.assertTrue(w1._compatible_parents(my_parents, set(my_parents)))
889
# same empty corner case
890
self.assertTrue(w1._compatible_parents(set(), set()))
891
# other cannot contain stuff my_parents does not
892
self.assertFalse(w1._compatible_parents(set(), set([1])))
893
self.assertFalse(w1._compatible_parents(my_parents, set([1, 2, 3, 4])))
894
self.assertFalse(w1._compatible_parents(my_parents, set([4])))
897
class TestWeaveFile(TestCaseInTempDir):
899
def test_empty_file(self):
900
f = open('empty.weave', 'wb+')
902
self.assertRaises(errors.WeaveFormatError,