1
# Copyright (C) 2005, 2006 by Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for Knit data structure"""
23
from bzrlib.errors import KnitError, RevisionAlreadyPresent
24
from bzrlib.knit import (
29
from bzrlib.osutils import split_lines
30
from bzrlib.tests import TestCaseWithTransport
31
from bzrlib.transport import TransportLogger, get_transport
32
from bzrlib.transport.memory import MemoryTransport
33
from bzrlib.weave import Weave
36
class KnitTests(TestCaseWithTransport):
37
"""Class containing knit test helper routines."""
39
def make_test_knit(self, annotate=False):
41
factory = KnitPlainFactory()
44
return KnitVersionedFile('test', get_transport('.'), access_mode='w', factory=factory, create=True)
47
class BasicKnitTests(KnitTests):
49
def add_stock_one_and_one_a(self, k):
50
k.add_lines('text-1', [], split_lines(TEXT_1))
51
k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
53
def test_knit_constructor(self):
54
"""Construct empty k"""
57
def test_knit_add(self):
58
"""Store one text in knit and retrieve"""
59
k = self.make_test_knit()
60
k.add_lines('text-1', [], split_lines(TEXT_1))
61
self.assertTrue(k.has_version('text-1'))
62
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
64
def test_knit_reload(self):
65
# test that the content in a reloaded knit is correct
66
k = self.make_test_knit()
67
k.add_lines('text-1', [], split_lines(TEXT_1))
69
k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
70
self.assertTrue(k2.has_version('text-1'))
71
self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
73
def test_knit_several(self):
74
"""Store several texts in a knit"""
75
k = self.make_test_knit()
76
k.add_lines('text-1', [], split_lines(TEXT_1))
77
k.add_lines('text-2', [], split_lines(TEXT_2))
78
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
79
self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
81
def test_repeated_add(self):
82
"""Knit traps attempt to replace existing version"""
83
k = self.make_test_knit()
84
k.add_lines('text-1', [], split_lines(TEXT_1))
85
self.assertRaises(RevisionAlreadyPresent,
87
'text-1', [], split_lines(TEXT_1))
90
k = self.make_test_knit(True)
91
k.add_lines('text-1', [], [])
92
self.assertEquals(k.get_lines('text-1'), [])
94
def test_incomplete(self):
95
"""Test if texts without a ending line-end can be inserted and
97
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
98
k.add_lines('text-1', [], ['a\n', 'b' ])
99
k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
100
# reopening ensures maximum room for confusion
101
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
102
self.assertEquals(k.get_lines('text-1'), ['a\n', 'b' ])
103
self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
105
def test_delta(self):
106
"""Expression of knit delta as lines"""
107
k = self.make_test_knit()
108
td = list(line_delta(TEXT_1.splitlines(True),
109
TEXT_1A.splitlines(True)))
110
self.assertEqualDiff(''.join(td), delta_1_1a)
111
out = apply_line_delta(TEXT_1.splitlines(True), td)
112
self.assertEqualDiff(''.join(out), TEXT_1A)
114
def test_add_with_parents(self):
115
"""Store in knit with parents"""
116
k = self.make_test_knit()
117
self.add_stock_one_and_one_a(k)
118
self.assertEquals(k.get_parents('text-1'), [])
119
self.assertEquals(k.get_parents('text-1a'), ['text-1'])
121
def test_ancestry(self):
122
"""Store in knit with parents"""
123
k = self.make_test_knit()
124
self.add_stock_one_and_one_a(k)
125
self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
127
def test_add_delta(self):
128
"""Store in knit with parents"""
129
k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
130
delta=True, create=True)
131
self.add_stock_one_and_one_a(k)
133
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
135
def test_annotate(self):
137
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
138
delta=True, create=True)
139
self.insert_and_test_small_annotate(k)
141
def insert_and_test_small_annotate(self, k):
142
"""test annotation with k works correctly."""
143
k.add_lines('text-1', [], ['a\n', 'b\n'])
144
k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
146
origins = k.annotate('text-2')
147
self.assertEquals(origins[0], ('text-1', 'a\n'))
148
self.assertEquals(origins[1], ('text-2', 'c\n'))
150
def test_annotate_fulltext(self):
152
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
153
delta=False, create=True)
154
self.insert_and_test_small_annotate(k)
156
def test_annotate_merge_1(self):
157
k = self.make_test_knit(True)
158
k.add_lines('text-a1', [], ['a\n', 'b\n'])
159
k.add_lines('text-a2', [], ['d\n', 'c\n'])
160
k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
161
origins = k.annotate('text-am')
162
self.assertEquals(origins[0], ('text-a2', 'd\n'))
163
self.assertEquals(origins[1], ('text-a1', 'b\n'))
165
def test_annotate_merge_2(self):
166
k = self.make_test_knit(True)
167
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
168
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
169
k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
170
origins = k.annotate('text-am')
171
self.assertEquals(origins[0], ('text-a1', 'a\n'))
172
self.assertEquals(origins[1], ('text-a2', 'y\n'))
173
self.assertEquals(origins[2], ('text-a1', 'c\n'))
175
def test_annotate_merge_9(self):
176
k = self.make_test_knit(True)
177
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
178
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
179
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
180
origins = k.annotate('text-am')
181
self.assertEquals(origins[0], ('text-am', 'k\n'))
182
self.assertEquals(origins[1], ('text-a2', 'y\n'))
183
self.assertEquals(origins[2], ('text-a1', 'c\n'))
185
def test_annotate_merge_3(self):
186
k = self.make_test_knit(True)
187
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
188
k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
189
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
190
origins = k.annotate('text-am')
191
self.assertEquals(origins[0], ('text-am', 'k\n'))
192
self.assertEquals(origins[1], ('text-a2', 'y\n'))
193
self.assertEquals(origins[2], ('text-a2', 'z\n'))
195
def test_annotate_merge_4(self):
196
k = self.make_test_knit(True)
197
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
198
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
199
k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
200
k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
201
origins = k.annotate('text-am')
202
self.assertEquals(origins[0], ('text-a1', 'a\n'))
203
self.assertEquals(origins[1], ('text-a1', 'b\n'))
204
self.assertEquals(origins[2], ('text-a2', 'z\n'))
206
def test_annotate_merge_5(self):
207
k = self.make_test_knit(True)
208
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
209
k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
210
k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
211
k.add_lines('text-am',
212
['text-a1', 'text-a2', 'text-a3'],
213
['a\n', 'e\n', 'z\n'])
214
origins = k.annotate('text-am')
215
self.assertEquals(origins[0], ('text-a1', 'a\n'))
216
self.assertEquals(origins[1], ('text-a2', 'e\n'))
217
self.assertEquals(origins[2], ('text-a3', 'z\n'))
219
def test_annotate_file_cherry_pick(self):
220
k = self.make_test_knit(True)
221
k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
222
k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
223
k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
224
origins = k.annotate('text-3')
225
self.assertEquals(origins[0], ('text-1', 'a\n'))
226
self.assertEquals(origins[1], ('text-1', 'b\n'))
227
self.assertEquals(origins[2], ('text-1', 'c\n'))
229
def test_knit_join(self):
230
"""Store in knit with parents"""
231
k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
232
k1.add_lines('text-a', [], split_lines(TEXT_1))
233
k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
235
k1.add_lines('text-c', [], split_lines(TEXT_1))
236
k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
238
k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
240
k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
241
count = k2.join(k1, version_ids=['text-m'])
242
self.assertEquals(count, 5)
243
self.assertTrue(k2.has_version('text-a'))
244
self.assertTrue(k2.has_version('text-c'))
246
def test_reannotate(self):
247
k1 = KnitVersionedFile('knit1', get_transport('.'),
248
factory=KnitAnnotateFactory(), create=True)
250
k1.add_lines('text-a', [], ['a\n', 'b\n'])
252
k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
254
k2 = KnitVersionedFile('test2', get_transport('.'),
255
factory=KnitAnnotateFactory(), create=True)
256
k2.join(k1, version_ids=['text-b'])
259
k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
261
k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
263
k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
265
# test-c will have index 3
266
k1.join(k2, version_ids=['text-c'])
268
lines = k1.get_lines('text-c')
269
self.assertEquals(lines, ['z\n', 'c\n'])
271
origins = k1.annotate('text-c')
272
self.assertEquals(origins[0], ('text-c', 'z\n'))
273
self.assertEquals(origins[1], ('text-b', 'c\n'))
275
def test_get_line_delta_texts(self):
276
"""Make sure we can call get_texts on text with reused line deltas"""
277
k1 = KnitVersionedFile('test1', get_transport('.'),
278
factory=KnitPlainFactory(), create=True)
283
parents = ['%d' % (t-1)]
284
k1.add_lines('%d' % t, parents, ['hello\n'] * t)
285
k1.get_texts(('%d' % t) for t in range(3))
287
def test_iter_lines_reads_in_order(self):
288
t = MemoryTransport()
289
instrumented_t = TransportLogger(t)
290
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
291
self.assertEqual([('id.kndx',)], instrumented_t._calls)
292
# add texts with no required ordering
293
k1.add_lines('base', [], ['text\n'])
294
k1.add_lines('base2', [], ['text2\n'])
296
instrumented_t._calls = []
297
# request a last-first iteration
298
results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
299
self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
300
self.assertEqual(['text\n', 'text2\n'], results)
302
def test_create_empty_annotated(self):
303
k1 = self.make_test_knit(True)
305
k1.add_lines('text-a', [], ['a\n', 'b\n'])
306
k2 = k1.create_empty('t', MemoryTransport())
307
self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
308
self.assertEqual(k1.delta, k2.delta)
309
# the generic test checks for empty content and file class
311
def test_knit_format(self):
312
# this tests that a new knit index file has the expected content
313
# and that is writes the data we expect as records are added.
314
knit = self.make_test_knit(True)
315
self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
316
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
317
self.assertFileEqual(
318
"# bzr knit index 8\n"
320
"revid fulltext 0 84 .a_ghost :",
322
knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
323
self.assertFileEqual(
324
"# bzr knit index 8\n"
325
"\nrevid fulltext 0 84 .a_ghost :"
326
"\nrevid2 line-delta 84 82 0 :",
328
# we should be able to load this file again
329
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
330
self.assertEqual(['revid', 'revid2'], knit.versions())
331
# write a short write to the file and ensure that its ignored
332
indexfile = file('test.kndx', 'at')
333
indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
335
# we should be able to load this file again
336
knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
337
self.assertEqual(['revid', 'revid2'], knit.versions())
338
# and add a revision with the same id the failed write had
339
knit.add_lines('revid3', ['revid2'], ['a\n'])
340
# and when reading it revid3 should now appear.
341
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
342
self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
343
self.assertEqual(['revid2'], knit.get_parents('revid3'))
345
def test_plan_merge(self):
346
my_knit = self.make_test_knit(annotate=True)
347
my_knit.add_lines('text1', [], split_lines(TEXT_1))
348
my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
349
my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
350
plan = list(my_knit.plan_merge('text1a', 'text1b'))
351
for plan_line, expected_line in zip(plan, AB_MERGE):
352
self.assertEqual(plan_line, expected_line)
364
Banana cup cake recipe
374
Banana cup cake recipe
376
- bananas (do not use plantains!!!)
383
Banana cup cake recipe
399
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
404
new-b|- bananas (do not use plantains!!!)
405
unchanged|- broken tea cups
406
new-a|- self-raising flour
409
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
412
def line_delta(from_lines, to_lines):
413
"""Generate line-based delta from one text to another"""
414
s = difflib.SequenceMatcher(None, from_lines, to_lines)
415
for op in s.get_opcodes():
418
yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
419
for i in range(op[3], op[4]):
423
def apply_line_delta(basis_lines, delta_lines):
424
"""Apply a line-based perfect diff
426
basis_lines -- text to apply the patch to
427
delta_lines -- diff instructions and content
432
while i < len(delta_lines):
434
a, b, c = map(long, l.split(','))
436
out[offset+a:offset+b] = delta_lines[i:i+c]
438
offset = offset + (b - a) + c
442
class TestWeaveToKnit(KnitTests):
444
def test_weave_to_knit_matches(self):
445
# check that the WeaveToKnit is_compatible function
446
# registers True for a Weave to a Knit.
448
k = self.make_test_knit()
449
self.failUnless(WeaveToKnit.is_compatible(w, k))
450
self.failIf(WeaveToKnit.is_compatible(k, w))
451
self.failIf(WeaveToKnit.is_compatible(w, w))
452
self.failIf(WeaveToKnit.is_compatible(k, k))
455
class TestKnitCaching(KnitTests):
457
def create_knit(self, cache_add=False):
458
k = self.make_test_knit(True)
462
k.add_lines('text-1', [], split_lines(TEXT_1))
463
k.add_lines('text-2', [], split_lines(TEXT_2))
466
def test_no_caching(self):
467
k = self.create_knit()
468
# Nothing should be cached without setting 'enable_cache'
469
self.assertEqual({}, k._data._cache)
471
def test_cache_add_and_clear(self):
472
k = self.create_knit(True)
474
self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
477
self.assertEqual({}, k._data._cache)
479
def test_cache_data_read_raw(self):
480
k = self.create_knit()
485
def read_one_raw(version):
486
pos_map = k._get_components_positions([version])
487
method, pos, size, next = pos_map[version]
488
lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
489
self.assertEqual(1, len(lst))
492
val = read_one_raw('text-1')
493
self.assertEqual({'text-1':val[1]}, k._data._cache)
496
# After clear, new reads are not cached
497
self.assertEqual({}, k._data._cache)
499
val2 = read_one_raw('text-1')
500
self.assertEqual(val, val2)
501
self.assertEqual({}, k._data._cache)
503
def test_cache_data_read(self):
504
k = self.create_knit()
506
def read_one(version):
507
pos_map = k._get_components_positions([version])
508
method, pos, size, next = pos_map[version]
509
lst = list(k._data.read_records_iter([(version, pos, size)]))
510
self.assertEqual(1, len(lst))
516
val = read_one('text-2')
517
self.assertEqual(['text-2'], k._data._cache.keys())
518
self.assertEqual('text-2', val[0])
519
content, digest = k._data._parse_record('text-2',
520
k._data._cache['text-2'])
521
self.assertEqual(content, val[1])
522
self.assertEqual(digest, val[2])
525
self.assertEqual({}, k._data._cache)
527
val2 = read_one('text-2')
528
self.assertEqual(val, val2)
529
self.assertEqual({}, k._data._cache)
531
def test_cache_read(self):
532
k = self.create_knit()
535
text = k.get_text('text-1')
536
self.assertEqual(TEXT_1, text)
537
self.assertEqual(['text-1'], k._data._cache.keys())
540
self.assertEqual({}, k._data._cache)
542
text = k.get_text('text-1')
543
self.assertEqual(TEXT_1, text)
544
self.assertEqual({}, k._data._cache)