1
# Copyright (C) 2005, 2006 by Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for Knit data structure"""
23
from bzrlib.errors import KnitError, RevisionAlreadyPresent
24
from bzrlib.knit import (
29
from bzrlib.osutils import split_lines
30
from bzrlib.tests import TestCaseWithTransport
31
from bzrlib.transport import TransportLogger
32
from bzrlib.transport.local import LocalTransport
33
from bzrlib.transport.memory import MemoryTransport
34
from bzrlib.weave import Weave
37
class KnitTests(TestCaseWithTransport):
38
"""Class containing knit test helper routines."""
40
def make_test_knit(self, annotate=False):
42
factory = KnitPlainFactory()
45
return KnitVersionedFile('test', LocalTransport('.'), access_mode='w', factory=factory, create=True)
48
class BasicKnitTests(KnitTests):
50
def add_stock_one_and_one_a(self, k):
51
k.add_lines('text-1', [], split_lines(TEXT_1))
52
k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
54
def test_knit_constructor(self):
55
"""Construct empty k"""
58
def test_knit_add(self):
59
"""Store one text in knit and retrieve"""
60
k = self.make_test_knit()
61
k.add_lines('text-1', [], split_lines(TEXT_1))
62
self.assertTrue(k.has_version('text-1'))
63
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
65
def test_knit_reload(self):
66
# test that the content in a reloaded knit is correct
67
k = self.make_test_knit()
68
k.add_lines('text-1', [], split_lines(TEXT_1))
70
k2 = KnitVersionedFile('test', LocalTransport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
71
self.assertTrue(k2.has_version('text-1'))
72
self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
74
def test_knit_several(self):
75
"""Store several texts in a knit"""
76
k = self.make_test_knit()
77
k.add_lines('text-1', [], split_lines(TEXT_1))
78
k.add_lines('text-2', [], split_lines(TEXT_2))
79
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
80
self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
82
def test_repeated_add(self):
83
"""Knit traps attempt to replace existing version"""
84
k = self.make_test_knit()
85
k.add_lines('text-1', [], split_lines(TEXT_1))
86
self.assertRaises(RevisionAlreadyPresent,
88
'text-1', [], split_lines(TEXT_1))
91
k = self.make_test_knit(True)
92
k.add_lines('text-1', [], [])
93
self.assertEquals(k.get_lines('text-1'), [])
95
def test_incomplete(self):
96
"""Test if texts without a ending line-end can be inserted and
98
k = KnitVersionedFile('test', LocalTransport('.'), delta=False, create=True)
99
k.add_lines('text-1', [], ['a\n', 'b' ])
100
k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
101
# reopening ensures maximum room for confusion
102
k = KnitVersionedFile('test', LocalTransport('.'), delta=False, create=True)
103
self.assertEquals(k.get_lines('text-1'), ['a\n', 'b' ])
104
self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
106
def test_delta(self):
107
"""Expression of knit delta as lines"""
108
k = self.make_test_knit()
109
td = list(line_delta(TEXT_1.splitlines(True),
110
TEXT_1A.splitlines(True)))
111
self.assertEqualDiff(''.join(td), delta_1_1a)
112
out = apply_line_delta(TEXT_1.splitlines(True), td)
113
self.assertEqualDiff(''.join(out), TEXT_1A)
115
def test_add_with_parents(self):
116
"""Store in knit with parents"""
117
k = self.make_test_knit()
118
self.add_stock_one_and_one_a(k)
119
self.assertEquals(k.get_parents('text-1'), [])
120
self.assertEquals(k.get_parents('text-1a'), ['text-1'])
122
def test_ancestry(self):
123
"""Store in knit with parents"""
124
k = self.make_test_knit()
125
self.add_stock_one_and_one_a(k)
126
self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
128
def test_add_delta(self):
129
"""Store in knit with parents"""
130
k = KnitVersionedFile('test', LocalTransport('.'), factory=KnitPlainFactory(),
131
delta=True, create=True)
132
self.add_stock_one_and_one_a(k)
134
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
136
def test_annotate(self):
138
k = KnitVersionedFile('knit', LocalTransport('.'), factory=KnitAnnotateFactory(),
139
delta=True, create=True)
140
self.insert_and_test_small_annotate(k)
142
def insert_and_test_small_annotate(self, k):
143
"""test annotation with k works correctly."""
144
k.add_lines('text-1', [], ['a\n', 'b\n'])
145
k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
147
origins = k.annotate('text-2')
148
self.assertEquals(origins[0], ('text-1', 'a\n'))
149
self.assertEquals(origins[1], ('text-2', 'c\n'))
151
def test_annotate_fulltext(self):
153
k = KnitVersionedFile('knit', LocalTransport('.'), factory=KnitAnnotateFactory(),
154
delta=False, create=True)
155
self.insert_and_test_small_annotate(k)
157
def test_annotate_merge_1(self):
158
k = self.make_test_knit(True)
159
k.add_lines('text-a1', [], ['a\n', 'b\n'])
160
k.add_lines('text-a2', [], ['d\n', 'c\n'])
161
k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
162
origins = k.annotate('text-am')
163
self.assertEquals(origins[0], ('text-a2', 'd\n'))
164
self.assertEquals(origins[1], ('text-a1', 'b\n'))
166
def test_annotate_merge_2(self):
167
k = self.make_test_knit(True)
168
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
169
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
170
k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
171
origins = k.annotate('text-am')
172
self.assertEquals(origins[0], ('text-a1', 'a\n'))
173
self.assertEquals(origins[1], ('text-a2', 'y\n'))
174
self.assertEquals(origins[2], ('text-a1', 'c\n'))
176
def test_annotate_merge_9(self):
177
k = self.make_test_knit(True)
178
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
179
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
180
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
181
origins = k.annotate('text-am')
182
self.assertEquals(origins[0], ('text-am', 'k\n'))
183
self.assertEquals(origins[1], ('text-a2', 'y\n'))
184
self.assertEquals(origins[2], ('text-a1', 'c\n'))
186
def test_annotate_merge_3(self):
187
k = self.make_test_knit(True)
188
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
189
k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
190
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
191
origins = k.annotate('text-am')
192
self.assertEquals(origins[0], ('text-am', 'k\n'))
193
self.assertEquals(origins[1], ('text-a2', 'y\n'))
194
self.assertEquals(origins[2], ('text-a2', 'z\n'))
196
def test_annotate_merge_4(self):
197
k = self.make_test_knit(True)
198
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
199
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
200
k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
201
k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
202
origins = k.annotate('text-am')
203
self.assertEquals(origins[0], ('text-a1', 'a\n'))
204
self.assertEquals(origins[1], ('text-a1', 'b\n'))
205
self.assertEquals(origins[2], ('text-a2', 'z\n'))
207
def test_annotate_merge_5(self):
208
k = self.make_test_knit(True)
209
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
210
k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
211
k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
212
k.add_lines('text-am',
213
['text-a1', 'text-a2', 'text-a3'],
214
['a\n', 'e\n', 'z\n'])
215
origins = k.annotate('text-am')
216
self.assertEquals(origins[0], ('text-a1', 'a\n'))
217
self.assertEquals(origins[1], ('text-a2', 'e\n'))
218
self.assertEquals(origins[2], ('text-a3', 'z\n'))
220
def test_annotate_file_cherry_pick(self):
221
k = self.make_test_knit(True)
222
k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
223
k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
224
k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
225
origins = k.annotate('text-3')
226
self.assertEquals(origins[0], ('text-1', 'a\n'))
227
self.assertEquals(origins[1], ('text-1', 'b\n'))
228
self.assertEquals(origins[2], ('text-1', 'c\n'))
230
def test_knit_join(self):
231
"""Store in knit with parents"""
232
k1 = KnitVersionedFile('test1', LocalTransport('.'), factory=KnitPlainFactory(), create=True)
233
k1.add_lines('text-a', [], split_lines(TEXT_1))
234
k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
236
k1.add_lines('text-c', [], split_lines(TEXT_1))
237
k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
239
k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
241
k2 = KnitVersionedFile('test2', LocalTransport('.'), factory=KnitPlainFactory(), create=True)
242
count = k2.join(k1, version_ids=['text-m'])
243
self.assertEquals(count, 5)
244
self.assertTrue(k2.has_version('text-a'))
245
self.assertTrue(k2.has_version('text-c'))
247
def test_reannotate(self):
248
k1 = KnitVersionedFile('knit1', LocalTransport('.'),
249
factory=KnitAnnotateFactory(), create=True)
251
k1.add_lines('text-a', [], ['a\n', 'b\n'])
253
k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
255
k2 = KnitVersionedFile('test2', LocalTransport('.'),
256
factory=KnitAnnotateFactory(), create=True)
257
k2.join(k1, version_ids=['text-b'])
260
k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
262
k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
264
k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
266
# test-c will have index 3
267
k1.join(k2, version_ids=['text-c'])
269
lines = k1.get_lines('text-c')
270
self.assertEquals(lines, ['z\n', 'c\n'])
272
origins = k1.annotate('text-c')
273
self.assertEquals(origins[0], ('text-c', 'z\n'))
274
self.assertEquals(origins[1], ('text-b', 'c\n'))
276
def test_extraction_reads_components_once(self):
277
t = MemoryTransport()
278
instrumented_t = TransportLogger(t)
279
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
280
# should read the index
281
self.assertEqual([('id.kndx',)], instrumented_t._calls)
282
instrumented_t._calls = []
284
k1.add_lines('base', [], ['text\n'])
285
# should not have read at all
286
self.assertEqual([], instrumented_t._calls)
289
k1.add_lines('sub', ['base'], ['text\n', 'text2\n'])
290
# should not have read at all
291
self.assertEqual([], instrumented_t._calls)
295
# should not have read at all
296
self.assertEqual([], instrumented_t._calls)
303
# should have read a component
304
# should not have read the first component only
305
self.assertEqual([('id.knit', [(0, 87)])], instrumented_t._calls)
306
instrumented_t._calls = []
309
# should not have read at all
310
self.assertEqual([], instrumented_t._calls)
311
# and now read the other component
313
# should have read the second component
314
self.assertEqual([('id.knit', [(87, 93)])], instrumented_t._calls)
315
instrumented_t._calls = []
320
k1.add_lines('sub2', ['base'], ['text\n', 'text3\n'])
321
# should read the first component only
322
self.assertEqual([('id.knit', [(0, 87)])], instrumented_t._calls)
324
def test_iter_lines_reads_in_order(self):
325
t = MemoryTransport()
326
instrumented_t = TransportLogger(t)
327
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
328
self.assertEqual([('id.kndx',)], instrumented_t._calls)
329
# add texts with no required ordering
330
k1.add_lines('base', [], ['text\n'])
331
k1.add_lines('base2', [], ['text2\n'])
333
instrumented_t._calls = []
334
# request a last-first iteration
335
results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
336
self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
337
self.assertEqual(['text\n', 'text2\n'], results)
339
def test_create_empty_annotated(self):
340
k1 = self.make_test_knit(True)
342
k1.add_lines('text-a', [], ['a\n', 'b\n'])
343
k2 = k1.create_empty('t', MemoryTransport())
344
self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
345
self.assertEqual(k1.delta, k2.delta)
346
# the generic test checks for empty content and file class
348
def test_knit_format(self):
349
# this tests that a new knit index file has the expected content
350
# and that is writes the data we expect as records are added.
351
knit = self.make_test_knit(True)
352
self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
353
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
354
self.assertFileEqual(
355
"# bzr knit index 8\n"
357
"revid fulltext 0 84 .a_ghost :",
359
knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
360
self.assertFileEqual(
361
"# bzr knit index 8\n"
362
"\nrevid fulltext 0 84 .a_ghost :"
363
"\nrevid2 line-delta 84 82 0 :",
365
# we should be able to load this file again
366
knit = KnitVersionedFile('test', LocalTransport('.'), access_mode='r')
367
self.assertEqual(['revid', 'revid2'], knit.versions())
368
# write a short write to the file and ensure that its ignored
369
indexfile = file('test.kndx', 'at')
370
indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
372
# we should be able to load this file again
373
knit = KnitVersionedFile('test', LocalTransport('.'), access_mode='w')
374
self.assertEqual(['revid', 'revid2'], knit.versions())
375
# and add a revision with the same id the failed write had
376
knit.add_lines('revid3', ['revid2'], ['a\n'])
377
# and when reading it revid3 should now appear.
378
knit = KnitVersionedFile('test', LocalTransport('.'), access_mode='r')
379
self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
380
self.assertEqual(['revid2'], knit.get_parents('revid3'))
382
def test_plan_merge(self):
383
my_knit = self.make_test_knit(annotate=True)
384
my_knit.add_lines('text1', [], split_lines(TEXT_1))
385
my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
386
my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
387
plan = list(my_knit.plan_merge('text1a', 'text1b'))
388
for plan_line, expected_line in zip(plan, AB_MERGE):
389
self.assertEqual(plan_line, expected_line)
401
Banana cup cake recipe
411
Banana cup cake recipe
413
- bananas (do not use plantains!!!)
420
Banana cup cake recipe
436
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
441
new-b|- bananas (do not use plantains!!!)
442
unchanged|- broken tea cups
443
new-a|- self-raising flour
446
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
449
def line_delta(from_lines, to_lines):
450
"""Generate line-based delta from one text to another"""
451
s = difflib.SequenceMatcher(None, from_lines, to_lines)
452
for op in s.get_opcodes():
455
yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
456
for i in range(op[3], op[4]):
460
def apply_line_delta(basis_lines, delta_lines):
461
"""Apply a line-based perfect diff
463
basis_lines -- text to apply the patch to
464
delta_lines -- diff instructions and content
469
while i < len(delta_lines):
471
a, b, c = map(long, l.split(','))
473
out[offset+a:offset+b] = delta_lines[i:i+c]
475
offset = offset + (b - a) + c
479
class TestWeaveToKnit(KnitTests):
481
def test_weave_to_knit_matches(self):
482
# check that the WeaveToKnit is_compatible function
483
# registers True for a Weave to a Knit.
485
k = self.make_test_knit()
486
self.failUnless(WeaveToKnit.is_compatible(w, k))
487
self.failIf(WeaveToKnit.is_compatible(k, w))
488
self.failIf(WeaveToKnit.is_compatible(w, w))
489
self.failIf(WeaveToKnit.is_compatible(k, k))