23
23
from bzrlib.errors import KnitError, RevisionAlreadyPresent
24
from bzrlib.knit import (
24
from bzrlib.knit import KnitVersionedFile, KnitPlainFactory, KnitAnnotateFactory
29
25
from bzrlib.osutils import split_lines
30
from bzrlib.tests import TestCaseWithTransport
31
from bzrlib.transport import TransportLogger, get_transport
26
from bzrlib.tests import TestCaseInTempDir
27
from bzrlib.transport.local import LocalTransport
32
28
from bzrlib.transport.memory import MemoryTransport
33
from bzrlib.weave import Weave
36
class KnitTests(TestCaseWithTransport):
37
"""Class containing knit test helper routines."""
29
from bzrlib.transactions import PassThroughTransaction
32
class KnitTests(TestCaseInTempDir):
34
def add_stock_one_and_one_a(self, k):
35
k.add_lines('text-1', [], split_lines(TEXT_1))
36
k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
38
def test_knit_constructor(self):
39
"""Construct empty k"""
39
42
def make_test_knit(self, annotate=False):
41
44
factory = KnitPlainFactory()
44
return KnitVersionedFile('test', get_transport('.'), access_mode='w', factory=factory, create=True)
47
class BasicKnitTests(KnitTests):
49
def add_stock_one_and_one_a(self, k):
50
k.add_lines('text-1', [], split_lines(TEXT_1))
51
k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
53
def test_knit_constructor(self):
54
"""Construct empty k"""
47
return KnitVersionedFile('test', LocalTransport('.'), access_mode='w', factory=factory, create=True)
57
49
def test_knit_add(self):
58
50
"""Store one text in knit and retrieve"""
62
54
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
64
56
def test_knit_reload(self):
65
# test that the content in a reloaded knit is correct
57
"""Store and reload a knit"""
66
58
k = self.make_test_knit()
67
59
k.add_lines('text-1', [], split_lines(TEXT_1))
69
k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
61
k2 = KnitVersionedFile('test', LocalTransport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
70
62
self.assertTrue(k2.has_version('text-1'))
71
63
self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
94
86
def test_incomplete(self):
95
87
"""Test if texts without a ending line-end can be inserted and
97
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
89
k = KnitVersionedFile('test', LocalTransport('.'), delta=False, create=True)
98
90
k.add_lines('text-1', [], ['a\n', 'b' ])
99
91
k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
100
# reopening ensures maximum room for confusion
101
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
102
92
self.assertEquals(k.get_lines('text-1'), ['a\n', 'b' ])
103
93
self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
127
117
def test_add_delta(self):
128
118
"""Store in knit with parents"""
129
k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
119
k = KnitVersionedFile('test', LocalTransport('.'), factory=KnitPlainFactory(),
130
120
delta=True, create=True)
131
121
self.add_stock_one_and_one_a(k)
133
122
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
135
124
def test_annotate(self):
136
125
"""Annotations"""
137
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
126
k = KnitVersionedFile('knit', LocalTransport('.'), factory=KnitAnnotateFactory(),
138
127
delta=True, create=True)
139
128
self.insert_and_test_small_annotate(k)
150
139
def test_annotate_fulltext(self):
151
140
"""Annotations"""
152
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
141
k = KnitVersionedFile('knit', LocalTransport('.'), factory=KnitAnnotateFactory(),
153
142
delta=False, create=True)
154
143
self.insert_and_test_small_annotate(k)
229
218
def test_knit_join(self):
230
219
"""Store in knit with parents"""
231
k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
220
k1 = KnitVersionedFile('test1', LocalTransport('.'), factory=KnitPlainFactory(), create=True)
232
221
k1.add_lines('text-a', [], split_lines(TEXT_1))
233
222
k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
238
227
k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
240
k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
229
k2 = KnitVersionedFile('test2', LocalTransport('.'), factory=KnitPlainFactory(), create=True)
241
230
count = k2.join(k1, version_ids=['text-m'])
242
231
self.assertEquals(count, 5)
243
232
self.assertTrue(k2.has_version('text-a'))
244
233
self.assertTrue(k2.has_version('text-c'))
246
235
def test_reannotate(self):
247
k1 = KnitVersionedFile('knit1', get_transport('.'),
236
k1 = KnitVersionedFile('knit1', LocalTransport('.'),
248
237
factory=KnitAnnotateFactory(), create=True)
250
239
k1.add_lines('text-a', [], ['a\n', 'b\n'])
252
241
k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
254
k2 = KnitVersionedFile('test2', get_transport('.'),
243
k2 = KnitVersionedFile('test2', LocalTransport('.'),
255
244
factory=KnitAnnotateFactory(), create=True)
256
245
k2.join(k1, version_ids=['text-b'])
269
258
self.assertEquals(lines, ['z\n', 'c\n'])
271
260
origins = k1.annotate('text-c')
272
self.assertEquals(origins[0], ('text-c', 'z\n'))
273
self.assertEquals(origins[1], ('text-b', 'c\n'))
275
def test_get_line_delta_texts(self):
276
"""Make sure we can call get_texts on text with reused line deltas"""
277
k1 = KnitVersionedFile('test1', get_transport('.'),
278
factory=KnitPlainFactory(), create=True)
283
parents = ['%d' % (t-1)]
284
k1.add_lines('%d' % t, parents, ['hello\n'] * t)
285
k1.get_texts(('%d' % t) for t in range(3))
287
def test_iter_lines_reads_in_order(self):
288
t = MemoryTransport()
289
instrumented_t = TransportLogger(t)
290
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
291
self.assertEqual([('id.kndx',)], instrumented_t._calls)
292
# add texts with no required ordering
293
k1.add_lines('base', [], ['text\n'])
294
k1.add_lines('base2', [], ['text2\n'])
296
instrumented_t._calls = []
297
# request a last-first iteration
298
results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
299
self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
300
self.assertEqual(['text\n', 'text2\n'], results)
261
self.assertEquals(origins[0], ('text-c', 'z\n'))
262
self.assertEquals(origins[1], ('text-b', 'c\n'))
302
264
def test_create_empty_annotated(self):
303
265
k1 = self.make_test_knit(True)
308
270
self.assertEqual(k1.delta, k2.delta)
309
271
# the generic test checks for empty content and file class
311
def test_knit_format(self):
312
# this tests that a new knit index file has the expected content
313
# and that is writes the data we expect as records are added.
314
knit = self.make_test_knit(True)
315
self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
316
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
317
self.assertFileEqual(
318
"# bzr knit index 8\n"
320
"revid fulltext 0 84 .a_ghost :",
322
knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
323
self.assertFileEqual(
324
"# bzr knit index 8\n"
325
"\nrevid fulltext 0 84 .a_ghost :"
326
"\nrevid2 line-delta 84 82 0 :",
328
# we should be able to load this file again
329
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
330
self.assertEqual(['revid', 'revid2'], knit.versions())
331
# write a short write to the file and ensure that its ignored
332
indexfile = file('test.kndx', 'at')
333
indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
335
# we should be able to load this file again
336
knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
337
self.assertEqual(['revid', 'revid2'], knit.versions())
338
# and add a revision with the same id the failed write had
339
knit.add_lines('revid3', ['revid2'], ['a\n'])
340
# and when reading it revid3 should now appear.
341
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
342
self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
343
self.assertEqual(['revid2'], knit.get_parents('revid3'))
345
def test_plan_merge(self):
346
my_knit = self.make_test_knit(annotate=True)
347
my_knit.add_lines('text1', [], split_lines(TEXT_1))
348
my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
349
my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
350
plan = list(my_knit.plan_merge('text1a', 'text1b'))
351
for plan_line, expected_line in zip(plan, AB_MERGE):
352
self.assertEqual(plan_line, expected_line)
356
275
Banana cup cakes:
399
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
404
new-b|- bananas (do not use plantains!!!)
405
unchanged|- broken tea cups
406
new-a|- self-raising flour
409
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
412
310
def line_delta(from_lines, to_lines):
413
311
"""Generate line-based delta from one text to another"""
414
312
s = difflib.SequenceMatcher(None, from_lines, to_lines)
438
336
offset = offset + (b - a) + c
442
class TestWeaveToKnit(KnitTests):
444
def test_weave_to_knit_matches(self):
445
# check that the WeaveToKnit is_compatible function
446
# registers True for a Weave to a Knit.
448
k = self.make_test_knit()
449
self.failUnless(WeaveToKnit.is_compatible(w, k))
450
self.failIf(WeaveToKnit.is_compatible(k, w))
451
self.failIf(WeaveToKnit.is_compatible(w, w))
452
self.failIf(WeaveToKnit.is_compatible(k, k))