1
# Copyright (C) 2005, 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for Knit data structure"""
23
from bzrlib.errors import KnitError, RevisionAlreadyPresent, NoSuchFile
24
from bzrlib.knit import (
29
from bzrlib.osutils import split_lines
30
from bzrlib.tests import TestCaseWithTransport
31
from bzrlib.transport import TransportLogger, get_transport
32
from bzrlib.transport.memory import MemoryTransport
33
from bzrlib.weave import Weave
36
class KnitTests(TestCaseWithTransport):
37
"""Class containing knit test helper routines."""
39
def make_test_knit(self, annotate=False, delay_create=False):
41
factory = KnitPlainFactory()
44
return KnitVersionedFile('test', get_transport('.'), access_mode='w',
45
factory=factory, create=True,
46
delay_create=delay_create)
49
class BasicKnitTests(KnitTests):
51
def add_stock_one_and_one_a(self, k):
52
k.add_lines('text-1', [], split_lines(TEXT_1))
53
k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
55
def test_knit_constructor(self):
56
"""Construct empty k"""
59
def test_knit_add(self):
60
"""Store one text in knit and retrieve"""
61
k = self.make_test_knit()
62
k.add_lines('text-1', [], split_lines(TEXT_1))
63
self.assertTrue(k.has_version('text-1'))
64
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
66
def test_knit_reload(self):
67
# test that the content in a reloaded knit is correct
68
k = self.make_test_knit()
69
k.add_lines('text-1', [], split_lines(TEXT_1))
71
k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
72
self.assertTrue(k2.has_version('text-1'))
73
self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
75
def test_knit_several(self):
76
"""Store several texts in a knit"""
77
k = self.make_test_knit()
78
k.add_lines('text-1', [], split_lines(TEXT_1))
79
k.add_lines('text-2', [], split_lines(TEXT_2))
80
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
81
self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
83
def test_repeated_add(self):
84
"""Knit traps attempt to replace existing version"""
85
k = self.make_test_knit()
86
k.add_lines('text-1', [], split_lines(TEXT_1))
87
self.assertRaises(RevisionAlreadyPresent,
89
'text-1', [], split_lines(TEXT_1))
92
k = self.make_test_knit(True)
93
k.add_lines('text-1', [], [])
94
self.assertEquals(k.get_lines('text-1'), [])
96
def test_incomplete(self):
97
"""Test if texts without a ending line-end can be inserted and
99
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
100
k.add_lines('text-1', [], ['a\n', 'b' ])
101
k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
102
# reopening ensures maximum room for confusion
103
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
104
self.assertEquals(k.get_lines('text-1'), ['a\n', 'b' ])
105
self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
107
def test_delta(self):
108
"""Expression of knit delta as lines"""
109
k = self.make_test_knit()
110
td = list(line_delta(TEXT_1.splitlines(True),
111
TEXT_1A.splitlines(True)))
112
self.assertEqualDiff(''.join(td), delta_1_1a)
113
out = apply_line_delta(TEXT_1.splitlines(True), td)
114
self.assertEqualDiff(''.join(out), TEXT_1A)
116
def test_add_with_parents(self):
117
"""Store in knit with parents"""
118
k = self.make_test_knit()
119
self.add_stock_one_and_one_a(k)
120
self.assertEquals(k.get_parents('text-1'), [])
121
self.assertEquals(k.get_parents('text-1a'), ['text-1'])
123
def test_ancestry(self):
124
"""Store in knit with parents"""
125
k = self.make_test_knit()
126
self.add_stock_one_and_one_a(k)
127
self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
129
def test_add_delta(self):
130
"""Store in knit with parents"""
131
k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
132
delta=True, create=True)
133
self.add_stock_one_and_one_a(k)
135
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
137
def test_annotate(self):
139
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
140
delta=True, create=True)
141
self.insert_and_test_small_annotate(k)
143
def insert_and_test_small_annotate(self, k):
144
"""test annotation with k works correctly."""
145
k.add_lines('text-1', [], ['a\n', 'b\n'])
146
k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
148
origins = k.annotate('text-2')
149
self.assertEquals(origins[0], ('text-1', 'a\n'))
150
self.assertEquals(origins[1], ('text-2', 'c\n'))
152
def test_annotate_fulltext(self):
154
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
155
delta=False, create=True)
156
self.insert_and_test_small_annotate(k)
158
def test_annotate_merge_1(self):
159
k = self.make_test_knit(True)
160
k.add_lines('text-a1', [], ['a\n', 'b\n'])
161
k.add_lines('text-a2', [], ['d\n', 'c\n'])
162
k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
163
origins = k.annotate('text-am')
164
self.assertEquals(origins[0], ('text-a2', 'd\n'))
165
self.assertEquals(origins[1], ('text-a1', 'b\n'))
167
def test_annotate_merge_2(self):
168
k = self.make_test_knit(True)
169
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
170
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
171
k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
172
origins = k.annotate('text-am')
173
self.assertEquals(origins[0], ('text-a1', 'a\n'))
174
self.assertEquals(origins[1], ('text-a2', 'y\n'))
175
self.assertEquals(origins[2], ('text-a1', 'c\n'))
177
def test_annotate_merge_9(self):
178
k = self.make_test_knit(True)
179
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
180
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
181
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
182
origins = k.annotate('text-am')
183
self.assertEquals(origins[0], ('text-am', 'k\n'))
184
self.assertEquals(origins[1], ('text-a2', 'y\n'))
185
self.assertEquals(origins[2], ('text-a1', 'c\n'))
187
def test_annotate_merge_3(self):
188
k = self.make_test_knit(True)
189
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
190
k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
191
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
192
origins = k.annotate('text-am')
193
self.assertEquals(origins[0], ('text-am', 'k\n'))
194
self.assertEquals(origins[1], ('text-a2', 'y\n'))
195
self.assertEquals(origins[2], ('text-a2', 'z\n'))
197
def test_annotate_merge_4(self):
198
k = self.make_test_knit(True)
199
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
200
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
201
k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
202
k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
203
origins = k.annotate('text-am')
204
self.assertEquals(origins[0], ('text-a1', 'a\n'))
205
self.assertEquals(origins[1], ('text-a1', 'b\n'))
206
self.assertEquals(origins[2], ('text-a2', 'z\n'))
208
def test_annotate_merge_5(self):
209
k = self.make_test_knit(True)
210
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
211
k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
212
k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
213
k.add_lines('text-am',
214
['text-a1', 'text-a2', 'text-a3'],
215
['a\n', 'e\n', 'z\n'])
216
origins = k.annotate('text-am')
217
self.assertEquals(origins[0], ('text-a1', 'a\n'))
218
self.assertEquals(origins[1], ('text-a2', 'e\n'))
219
self.assertEquals(origins[2], ('text-a3', 'z\n'))
221
def test_annotate_file_cherry_pick(self):
222
k = self.make_test_knit(True)
223
k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
224
k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
225
k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
226
origins = k.annotate('text-3')
227
self.assertEquals(origins[0], ('text-1', 'a\n'))
228
self.assertEquals(origins[1], ('text-1', 'b\n'))
229
self.assertEquals(origins[2], ('text-1', 'c\n'))
231
def test_knit_join(self):
232
"""Store in knit with parents"""
233
k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
234
k1.add_lines('text-a', [], split_lines(TEXT_1))
235
k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
237
k1.add_lines('text-c', [], split_lines(TEXT_1))
238
k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
240
k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
242
k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
243
count = k2.join(k1, version_ids=['text-m'])
244
self.assertEquals(count, 5)
245
self.assertTrue(k2.has_version('text-a'))
246
self.assertTrue(k2.has_version('text-c'))
248
def test_reannotate(self):
249
k1 = KnitVersionedFile('knit1', get_transport('.'),
250
factory=KnitAnnotateFactory(), create=True)
252
k1.add_lines('text-a', [], ['a\n', 'b\n'])
254
k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
256
k2 = KnitVersionedFile('test2', get_transport('.'),
257
factory=KnitAnnotateFactory(), create=True)
258
k2.join(k1, version_ids=['text-b'])
261
k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
263
k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
265
k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
267
# test-c will have index 3
268
k1.join(k2, version_ids=['text-c'])
270
lines = k1.get_lines('text-c')
271
self.assertEquals(lines, ['z\n', 'c\n'])
273
origins = k1.annotate('text-c')
274
self.assertEquals(origins[0], ('text-c', 'z\n'))
275
self.assertEquals(origins[1], ('text-b', 'c\n'))
277
def test_get_line_delta_texts(self):
278
"""Make sure we can call get_texts on text with reused line deltas"""
279
k1 = KnitVersionedFile('test1', get_transport('.'),
280
factory=KnitPlainFactory(), create=True)
285
parents = ['%d' % (t-1)]
286
k1.add_lines('%d' % t, parents, ['hello\n'] * t)
287
k1.get_texts(('%d' % t) for t in range(3))
289
def test_iter_lines_reads_in_order(self):
290
t = MemoryTransport()
291
instrumented_t = TransportLogger(t)
292
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
293
self.assertEqual([('id.kndx',)], instrumented_t._calls)
294
# add texts with no required ordering
295
k1.add_lines('base', [], ['text\n'])
296
k1.add_lines('base2', [], ['text2\n'])
298
instrumented_t._calls = []
299
# request a last-first iteration
300
results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
301
self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
302
self.assertEqual(['text\n', 'text2\n'], results)
304
def test_create_empty_annotated(self):
305
k1 = self.make_test_knit(True)
307
k1.add_lines('text-a', [], ['a\n', 'b\n'])
308
k2 = k1.create_empty('t', MemoryTransport())
309
self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
310
self.assertEqual(k1.delta, k2.delta)
311
# the generic test checks for empty content and file class
313
def test_knit_format(self):
314
# this tests that a new knit index file has the expected content
315
# and that is writes the data we expect as records are added.
316
knit = self.make_test_knit(True)
317
# Now knit files are not created until we first add data to them
318
self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
319
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
320
self.assertFileEqual(
321
"# bzr knit index 8\n"
323
"revid fulltext 0 84 .a_ghost :",
325
knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
326
self.assertFileEqual(
327
"# bzr knit index 8\n"
328
"\nrevid fulltext 0 84 .a_ghost :"
329
"\nrevid2 line-delta 84 82 0 :",
331
# we should be able to load this file again
332
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
333
self.assertEqual(['revid', 'revid2'], knit.versions())
334
# write a short write to the file and ensure that its ignored
335
indexfile = file('test.kndx', 'at')
336
indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
338
# we should be able to load this file again
339
knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
340
self.assertEqual(['revid', 'revid2'], knit.versions())
341
# and add a revision with the same id the failed write had
342
knit.add_lines('revid3', ['revid2'], ['a\n'])
343
# and when reading it revid3 should now appear.
344
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
345
self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
346
self.assertEqual(['revid2'], knit.get_parents('revid3'))
348
def test_delay_create(self):
349
"""Test that passing delay_create=True creates files late"""
350
knit = self.make_test_knit(annotate=True, delay_create=True)
351
self.failIfExists('test.knit')
352
self.failIfExists('test.kndx')
353
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
354
self.failUnlessExists('test.knit')
355
self.assertFileEqual(
356
"# bzr knit index 8\n"
358
"revid fulltext 0 84 .a_ghost :",
361
def test_create_parent_dir(self):
362
"""create_parent_dir can create knits in nonexistant dirs"""
363
# Has no effect if we don't set 'delay_create'
364
trans = get_transport('.')
365
self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
366
trans, access_mode='w', factory=None,
367
create=True, create_parent_dir=True)
368
# Nothing should have changed yet
369
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
370
factory=None, create=True,
371
create_parent_dir=True,
373
self.failIfExists('dir/test.knit')
374
self.failIfExists('dir/test.kndx')
375
self.failIfExists('dir')
376
knit.add_lines('revid', [], ['a\n'])
377
self.failUnlessExists('dir')
378
self.failUnlessExists('dir/test.knit')
379
self.assertFileEqual(
380
"# bzr knit index 8\n"
382
"revid fulltext 0 84 :",
385
def test_create_mode_700(self):
386
trans = get_transport('.')
387
if not trans._can_roundtrip_unix_modebits():
388
# Can't roundtrip, so no need to run this test
390
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
391
factory=None, create=True,
392
create_parent_dir=True,
396
knit.add_lines('revid', [], ['a\n'])
397
self.assertTransportMode(trans, 'dir', 0700)
398
self.assertTransportMode(trans, 'dir/test.knit', 0600)
399
self.assertTransportMode(trans, 'dir/test.kndx', 0600)
401
def test_create_mode_770(self):
402
trans = get_transport('.')
403
if not trans._can_roundtrip_unix_modebits():
404
# Can't roundtrip, so no need to run this test
406
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
407
factory=None, create=True,
408
create_parent_dir=True,
412
knit.add_lines('revid', [], ['a\n'])
413
self.assertTransportMode(trans, 'dir', 0770)
414
self.assertTransportMode(trans, 'dir/test.knit', 0660)
415
self.assertTransportMode(trans, 'dir/test.kndx', 0660)
417
def test_create_mode_777(self):
418
trans = get_transport('.')
419
if not trans._can_roundtrip_unix_modebits():
420
# Can't roundtrip, so no need to run this test
422
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
423
factory=None, create=True,
424
create_parent_dir=True,
428
knit.add_lines('revid', [], ['a\n'])
429
self.assertTransportMode(trans, 'dir', 0777)
430
self.assertTransportMode(trans, 'dir/test.knit', 0666)
431
self.assertTransportMode(trans, 'dir/test.kndx', 0666)
433
def test_plan_merge(self):
434
my_knit = self.make_test_knit(annotate=True)
435
my_knit.add_lines('text1', [], split_lines(TEXT_1))
436
my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
437
my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
438
plan = list(my_knit.plan_merge('text1a', 'text1b'))
439
for plan_line, expected_line in zip(plan, AB_MERGE):
440
self.assertEqual(plan_line, expected_line)
452
Banana cup cake recipe
462
Banana cup cake recipe
464
- bananas (do not use plantains!!!)
471
Banana cup cake recipe
487
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
492
new-b|- bananas (do not use plantains!!!)
493
unchanged|- broken tea cups
494
new-a|- self-raising flour
497
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
500
def line_delta(from_lines, to_lines):
501
"""Generate line-based delta from one text to another"""
502
s = difflib.SequenceMatcher(None, from_lines, to_lines)
503
for op in s.get_opcodes():
506
yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
507
for i in range(op[3], op[4]):
511
def apply_line_delta(basis_lines, delta_lines):
512
"""Apply a line-based perfect diff
514
basis_lines -- text to apply the patch to
515
delta_lines -- diff instructions and content
520
while i < len(delta_lines):
522
a, b, c = map(long, l.split(','))
524
out[offset+a:offset+b] = delta_lines[i:i+c]
526
offset = offset + (b - a) + c
530
class TestWeaveToKnit(KnitTests):
532
def test_weave_to_knit_matches(self):
533
# check that the WeaveToKnit is_compatible function
534
# registers True for a Weave to a Knit.
536
k = self.make_test_knit()
537
self.failUnless(WeaveToKnit.is_compatible(w, k))
538
self.failIf(WeaveToKnit.is_compatible(k, w))
539
self.failIf(WeaveToKnit.is_compatible(w, w))
540
self.failIf(WeaveToKnit.is_compatible(k, k))
543
class TestKnitCaching(KnitTests):
545
def create_knit(self, cache_add=False):
546
k = self.make_test_knit(True)
550
k.add_lines('text-1', [], split_lines(TEXT_1))
551
k.add_lines('text-2', [], split_lines(TEXT_2))
554
def test_no_caching(self):
555
k = self.create_knit()
556
# Nothing should be cached without setting 'enable_cache'
557
self.assertEqual({}, k._data._cache)
559
def test_cache_add_and_clear(self):
560
k = self.create_knit(True)
562
self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
565
self.assertEqual({}, k._data._cache)
567
def test_cache_data_read_raw(self):
568
k = self.create_knit()
573
def read_one_raw(version):
574
pos_map = k._get_components_positions([version])
575
method, pos, size, next = pos_map[version]
576
lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
577
self.assertEqual(1, len(lst))
580
val = read_one_raw('text-1')
581
self.assertEqual({'text-1':val[1]}, k._data._cache)
584
# After clear, new reads are not cached
585
self.assertEqual({}, k._data._cache)
587
val2 = read_one_raw('text-1')
588
self.assertEqual(val, val2)
589
self.assertEqual({}, k._data._cache)
591
def test_cache_data_read(self):
592
k = self.create_knit()
594
def read_one(version):
595
pos_map = k._get_components_positions([version])
596
method, pos, size, next = pos_map[version]
597
lst = list(k._data.read_records_iter([(version, pos, size)]))
598
self.assertEqual(1, len(lst))
604
val = read_one('text-2')
605
self.assertEqual(['text-2'], k._data._cache.keys())
606
self.assertEqual('text-2', val[0])
607
content, digest = k._data._parse_record('text-2',
608
k._data._cache['text-2'])
609
self.assertEqual(content, val[1])
610
self.assertEqual(digest, val[2])
613
self.assertEqual({}, k._data._cache)
615
val2 = read_one('text-2')
616
self.assertEqual(val, val2)
617
self.assertEqual({}, k._data._cache)
619
def test_cache_read(self):
620
k = self.create_knit()
623
text = k.get_text('text-1')
624
self.assertEqual(TEXT_1, text)
625
self.assertEqual(['text-1'], k._data._cache.keys())
628
self.assertEqual({}, k._data._cache)
630
text = k.get_text('text-1')
631
self.assertEqual(TEXT_1, text)
632
self.assertEqual({}, k._data._cache)
635
class TestKnitIndex(KnitTests):
637
def test_add_versions_dictionary_compresses(self):
638
"""Adding versions to the index should update the lookup dict"""
639
knit = self.make_test_knit()
641
idx.add_version('a-1', ['fulltext'], 0, 0, [])
642
self.check_file_contents('test.kndx',
643
'# bzr knit index 8\n'
647
idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
648
('a-3', ['fulltext'], 0, 0, ['a-2']),
650
self.check_file_contents('test.kndx',
651
'# bzr knit index 8\n'
653
'a-1 fulltext 0 0 :\n'
654
'a-2 fulltext 0 0 0 :\n'
655
'a-3 fulltext 0 0 1 :'
657
self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
658
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
659
'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
660
'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
663
def test_add_versions_fails_clean(self):
664
"""If add_versions fails in the middle, it restores a pristine state.
666
Any modifications that are made to the index are reset if all versions
669
# This cheats a little bit by passing in a generator which will
670
# raise an exception before the processing finishes
671
# Other possibilities would be to have an version with the wrong number
672
# of entries, or to make the backing transport unable to write any
675
knit = self.make_test_knit()
677
idx.add_version('a-1', ['fulltext'], 0, 0, [])
679
class StopEarly(Exception):
682
def generate_failure():
683
"""Add some entries and then raise an exception"""
684
yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
685
yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
688
# Assert the pre-condition
689
self.assertEqual(['a-1'], idx._history)
690
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
692
self.assertRaises(StopEarly, idx.add_versions, generate_failure())
694
# And it shouldn't be modified
695
self.assertEqual(['a-1'], idx._history)
696
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)