1
# Copyright (C) 2005, 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for Knit data structure"""
19
from cStringIO import StringIO
27
from bzrlib.errors import (
28
RevisionAlreadyPresent,
33
from bzrlib.knit import (
43
from bzrlib.osutils import split_lines
44
from bzrlib.tests import TestCase, TestCaseWithTransport
45
from bzrlib.transport import TransportLogger, get_transport
46
from bzrlib.transport.memory import MemoryTransport
47
from bzrlib.weave import Weave
50
class KnitContentTests(TestCase):
52
def test_constructor(self):
53
content = KnitContent([])
56
content = KnitContent([])
57
self.assertEqual(content.text(), [])
59
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
60
self.assertEqual(content.text(), ["text1", "text2"])
62
def test_annotate(self):
63
content = KnitContent([])
64
self.assertEqual(content.annotate(), [])
66
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
67
self.assertEqual(content.annotate(),
68
[("origin1", "text1"), ("origin2", "text2")])
70
def test_annotate_iter(self):
71
content = KnitContent([])
72
it = content.annotate_iter()
73
self.assertRaises(StopIteration, it.next)
75
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
76
it = content.annotate_iter()
77
self.assertEqual(it.next(), ("origin1", "text1"))
78
self.assertEqual(it.next(), ("origin2", "text2"))
79
self.assertRaises(StopIteration, it.next)
82
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
84
self.assertIsInstance(copy, KnitContent)
85
self.assertEqual(copy.annotate(),
86
[("origin1", "text1"), ("origin2", "text2")])
88
def test_line_delta(self):
89
content1 = KnitContent([("", "a"), ("", "b")])
90
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
91
self.assertEqual(content1.line_delta(content2),
92
[(1, 2, 2, [("", "a"), ("", "c")])])
94
def test_line_delta_iter(self):
95
content1 = KnitContent([("", "a"), ("", "b")])
96
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
97
it = content1.line_delta_iter(content2)
98
self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
99
self.assertRaises(StopIteration, it.next)
102
class MockTransport(object):
104
def __init__(self, file_lines=None):
105
self.file_lines = file_lines
107
# We have no base directory for the MockTransport
110
def get(self, filename):
111
if self.file_lines is None:
112
raise NoSuchFile(filename)
114
return StringIO("\n".join(self.file_lines))
116
def readv(self, relpath, offsets):
117
fp = self.get(relpath)
118
for offset, size in offsets:
120
yield offset, fp.read(size)
122
def __getattr__(self, name):
123
def queue_call(*args, **kwargs):
124
self.calls.append((name, args, kwargs))
128
class LowLevelKnitDataTests(TestCase):
130
def create_gz_content(self, text):
132
gz_file = gzip.GzipFile(mode='wb', fileobj=sio)
135
return sio.getvalue()
137
def test_valid_knit_data(self):
138
sha1sum = sha.new('foo\nbar\n').hexdigest()
139
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
144
transport = MockTransport([gz_txt])
145
data = _KnitData(transport, 'filename', mode='r')
146
records = [('rev-id-1', 0, len(gz_txt))]
148
contents = data.read_records(records)
149
self.assertEqual({'rev-id-1':(['foo\n', 'bar\n'], sha1sum)}, contents)
151
raw_contents = list(data.read_records_iter_raw(records))
152
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
154
def test_not_enough_lines(self):
155
sha1sum = sha.new('foo\n').hexdigest()
156
# record says 2 lines data says 1
157
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
161
transport = MockTransport([gz_txt])
162
data = _KnitData(transport, 'filename', mode='r')
163
records = [('rev-id-1', 0, len(gz_txt))]
164
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
166
# read_records_iter_raw won't detect that sort of mismatch/corruption
167
raw_contents = list(data.read_records_iter_raw(records))
168
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
170
def test_too_many_lines(self):
171
sha1sum = sha.new('foo\nbar\n').hexdigest()
172
# record says 1 lines data says 2
173
gz_txt = self.create_gz_content('version rev-id-1 1 %s\n'
178
transport = MockTransport([gz_txt])
179
data = _KnitData(transport, 'filename', mode='r')
180
records = [('rev-id-1', 0, len(gz_txt))]
181
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
183
# read_records_iter_raw won't detect that sort of mismatch/corruption
184
raw_contents = list(data.read_records_iter_raw(records))
185
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
187
def test_mismatched_version_id(self):
188
sha1sum = sha.new('foo\nbar\n').hexdigest()
189
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
194
transport = MockTransport([gz_txt])
195
data = _KnitData(transport, 'filename', mode='r')
196
# We are asking for rev-id-2, but the data is rev-id-1
197
records = [('rev-id-2', 0, len(gz_txt))]
198
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
200
# read_records_iter_raw will notice if we request the wrong version.
201
self.assertRaises(errors.KnitCorrupt, list,
202
data.read_records_iter_raw(records))
204
def test_uncompressed_data(self):
205
sha1sum = sha.new('foo\nbar\n').hexdigest()
206
txt = ('version rev-id-1 2 %s\n'
211
transport = MockTransport([txt])
212
data = _KnitData(transport, 'filename', mode='r')
213
records = [('rev-id-1', 0, len(txt))]
215
# We don't have valid gzip data ==> corrupt
216
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
218
# read_records_iter_raw will notice the bad data
219
self.assertRaises(errors.KnitCorrupt, list,
220
data.read_records_iter_raw(records))
222
def test_corrupted_data(self):
223
sha1sum = sha.new('foo\nbar\n').hexdigest()
224
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
229
# Change 2 bytes in the middle to \xff
230
gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
231
transport = MockTransport([gz_txt])
232
data = _KnitData(transport, 'filename', mode='r')
233
records = [('rev-id-1', 0, len(gz_txt))]
235
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
237
# read_records_iter_raw will notice if we request the wrong version.
238
self.assertRaises(errors.KnitCorrupt, list,
239
data.read_records_iter_raw(records))
242
class LowLevelKnitIndexTests(TestCase):
244
def test_no_such_file(self):
245
transport = MockTransport()
247
self.assertRaises(NoSuchFile, _KnitIndex, transport, "filename", "r")
248
self.assertRaises(NoSuchFile, _KnitIndex, transport,
249
"filename", "w", create=False)
251
def test_create_file(self):
252
transport = MockTransport()
254
index = _KnitIndex(transport, "filename", "w",
255
file_mode="wb", create=True)
257
("put_bytes_non_atomic",
258
("filename", index.HEADER), {"mode": "wb"}),
259
transport.calls.pop(0))
261
def test_delay_create_file(self):
262
transport = MockTransport()
264
index = _KnitIndex(transport, "filename", "w",
265
create=True, file_mode="wb", create_parent_dir=True,
266
delay_create=True, dir_mode=0777)
267
self.assertEqual([], transport.calls)
269
index.add_versions([])
270
name, (filename, f), kwargs = transport.calls.pop(0)
271
self.assertEqual("put_file_non_atomic", name)
273
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
275
self.assertEqual("filename", filename)
276
self.assertEqual(index.HEADER, f.read())
278
index.add_versions([])
279
self.assertEqual(("append_bytes", ("filename", ""), {}),
280
transport.calls.pop(0))
282
def test_read_utf8_version_id(self):
283
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
284
utf8_revision_id = unicode_revision_id.encode('utf-8')
285
transport = MockTransport([
287
'%s option 0 1 :' % (utf8_revision_id,)
289
index = _KnitIndex(transport, "filename", "r")
290
# _KnitIndex is a private class, and deals in utf8 revision_ids, not
291
# Unicode revision_ids.
292
self.assertTrue(index.has_version(utf8_revision_id))
293
self.assertFalse(index.has_version(unicode_revision_id))
295
def test_read_utf8_parents(self):
296
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
297
utf8_revision_id = unicode_revision_id.encode('utf-8')
298
transport = MockTransport([
300
"version option 0 1 .%s :" % (utf8_revision_id,)
302
index = _KnitIndex(transport, "filename", "r")
303
self.assertEqual([utf8_revision_id],
304
index.get_parents_with_ghosts("version"))
306
def test_read_ignore_corrupted_lines(self):
307
transport = MockTransport([
310
"corrupted options 0 1 .b .c ",
311
"version options 0 1 :"
313
index = _KnitIndex(transport, "filename", "r")
314
self.assertEqual(1, index.num_versions())
315
self.assertTrue(index.has_version("version"))
317
def test_read_corrupted_header(self):
318
transport = MockTransport(['not a bzr knit index header\n'])
319
self.assertRaises(KnitHeaderError,
320
_KnitIndex, transport, "filename", "r")
322
def test_read_duplicate_entries(self):
323
transport = MockTransport([
325
"parent options 0 1 :",
326
"version options1 0 1 0 :",
327
"version options2 1 2 .other :",
328
"version options3 3 4 0 .other :"
330
index = _KnitIndex(transport, "filename", "r")
331
self.assertEqual(2, index.num_versions())
332
self.assertEqual(1, index.lookup("version"))
333
self.assertEqual((3, 4), index.get_position("version"))
334
self.assertEqual(["options3"], index.get_options("version"))
335
self.assertEqual(["parent", "other"],
336
index.get_parents_with_ghosts("version"))
338
def test_read_compressed_parents(self):
339
transport = MockTransport([
343
"c option 0 1 1 0 :",
345
index = _KnitIndex(transport, "filename", "r")
346
self.assertEqual(["a"], index.get_parents("b"))
347
self.assertEqual(["b", "a"], index.get_parents("c"))
349
def test_write_utf8_version_id(self):
350
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
351
utf8_revision_id = unicode_revision_id.encode('utf-8')
352
transport = MockTransport([
355
index = _KnitIndex(transport, "filename", "r")
356
index.add_version(utf8_revision_id, ["option"], 0, 1, [])
357
self.assertEqual(("append_bytes", ("filename",
358
"\n%s option 0 1 :" % (utf8_revision_id,)),
360
transport.calls.pop(0))
362
def test_write_utf8_parents(self):
363
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
364
utf8_revision_id = unicode_revision_id.encode('utf-8')
365
transport = MockTransport([
368
index = _KnitIndex(transport, "filename", "r")
369
index.add_version("version", ["option"], 0, 1, [utf8_revision_id])
370
self.assertEqual(("append_bytes", ("filename",
371
"\nversion option 0 1 .%s :" % (utf8_revision_id,)),
373
transport.calls.pop(0))
375
def test_get_graph(self):
376
transport = MockTransport()
377
index = _KnitIndex(transport, "filename", "w", create=True)
378
self.assertEqual([], index.get_graph())
380
index.add_version("a", ["option"], 0, 1, ["b"])
381
self.assertEqual([("a", ["b"])], index.get_graph())
383
index.add_version("c", ["option"], 0, 1, ["d"])
384
self.assertEqual([("a", ["b"]), ("c", ["d"])],
385
sorted(index.get_graph()))
387
def test_get_ancestry(self):
388
transport = MockTransport([
391
"b option 0 1 0 .e :",
392
"c option 0 1 1 0 :",
393
"d option 0 1 2 .f :"
395
index = _KnitIndex(transport, "filename", "r")
397
self.assertEqual([], index.get_ancestry([]))
398
self.assertEqual(["a"], index.get_ancestry(["a"]))
399
self.assertEqual(["a", "b"], index.get_ancestry(["b"]))
400
self.assertEqual(["a", "b", "c"], index.get_ancestry(["c"]))
401
self.assertEqual(["a", "b", "c", "d"], index.get_ancestry(["d"]))
402
self.assertEqual(["a", "b"], index.get_ancestry(["a", "b"]))
403
self.assertEqual(["a", "b", "c"], index.get_ancestry(["a", "c"]))
405
self.assertRaises(RevisionNotPresent, index.get_ancestry, ["e"])
407
def test_get_ancestry_with_ghosts(self):
408
transport = MockTransport([
411
"b option 0 1 0 .e :",
412
"c option 0 1 0 .f .g :",
413
"d option 0 1 2 .h .j .k :"
415
index = _KnitIndex(transport, "filename", "r")
417
self.assertEqual([], index.get_ancestry_with_ghosts([]))
418
self.assertEqual(["a"], index.get_ancestry_with_ghosts(["a"]))
419
self.assertEqual(["a", "e", "b"],
420
index.get_ancestry_with_ghosts(["b"]))
421
self.assertEqual(["a", "g", "f", "c"],
422
index.get_ancestry_with_ghosts(["c"]))
423
self.assertEqual(["a", "g", "f", "c", "k", "j", "h", "d"],
424
index.get_ancestry_with_ghosts(["d"]))
425
self.assertEqual(["a", "e", "b"],
426
index.get_ancestry_with_ghosts(["a", "b"]))
427
self.assertEqual(["a", "g", "f", "c"],
428
index.get_ancestry_with_ghosts(["a", "c"]))
430
["a", "g", "f", "c", "e", "b", "k", "j", "h", "d"],
431
index.get_ancestry_with_ghosts(["b", "d"]))
433
self.assertRaises(RevisionNotPresent,
434
index.get_ancestry_with_ghosts, ["e"])
436
def test_num_versions(self):
437
transport = MockTransport([
440
index = _KnitIndex(transport, "filename", "r")
442
self.assertEqual(0, index.num_versions())
443
self.assertEqual(0, len(index))
445
index.add_version("a", ["option"], 0, 1, [])
446
self.assertEqual(1, index.num_versions())
447
self.assertEqual(1, len(index))
449
index.add_version("a", ["option2"], 1, 2, [])
450
self.assertEqual(1, index.num_versions())
451
self.assertEqual(1, len(index))
453
index.add_version("b", ["option"], 0, 1, [])
454
self.assertEqual(2, index.num_versions())
455
self.assertEqual(2, len(index))
457
def test_get_versions(self):
458
transport = MockTransport([
461
index = _KnitIndex(transport, "filename", "r")
463
self.assertEqual([], index.get_versions())
465
index.add_version("a", ["option"], 0, 1, [])
466
self.assertEqual(["a"], index.get_versions())
468
index.add_version("a", ["option"], 0, 1, [])
469
self.assertEqual(["a"], index.get_versions())
471
index.add_version("b", ["option"], 0, 1, [])
472
self.assertEqual(["a", "b"], index.get_versions())
474
def test_idx_to_name(self):
475
transport = MockTransport([
480
index = _KnitIndex(transport, "filename", "r")
482
self.assertEqual("a", index.idx_to_name(0))
483
self.assertEqual("b", index.idx_to_name(1))
484
self.assertEqual("b", index.idx_to_name(-1))
485
self.assertEqual("a", index.idx_to_name(-2))
487
def test_lookup(self):
488
transport = MockTransport([
493
index = _KnitIndex(transport, "filename", "r")
495
self.assertEqual(0, index.lookup("a"))
496
self.assertEqual(1, index.lookup("b"))
498
def test_add_version(self):
499
transport = MockTransport([
502
index = _KnitIndex(transport, "filename", "r")
504
index.add_version("a", ["option"], 0, 1, ["b"])
505
self.assertEqual(("append_bytes",
506
("filename", "\na option 0 1 .b :"),
507
{}), transport.calls.pop(0))
508
self.assertTrue(index.has_version("a"))
509
self.assertEqual(1, index.num_versions())
510
self.assertEqual((0, 1), index.get_position("a"))
511
self.assertEqual(["option"], index.get_options("a"))
512
self.assertEqual(["b"], index.get_parents_with_ghosts("a"))
514
index.add_version("a", ["opt"], 1, 2, ["c"])
515
self.assertEqual(("append_bytes",
516
("filename", "\na opt 1 2 .c :"),
517
{}), transport.calls.pop(0))
518
self.assertTrue(index.has_version("a"))
519
self.assertEqual(1, index.num_versions())
520
self.assertEqual((1, 2), index.get_position("a"))
521
self.assertEqual(["opt"], index.get_options("a"))
522
self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
524
index.add_version("b", ["option"], 2, 3, ["a"])
525
self.assertEqual(("append_bytes",
526
("filename", "\nb option 2 3 0 :"),
527
{}), transport.calls.pop(0))
528
self.assertTrue(index.has_version("b"))
529
self.assertEqual(2, index.num_versions())
530
self.assertEqual((2, 3), index.get_position("b"))
531
self.assertEqual(["option"], index.get_options("b"))
532
self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
534
def test_add_versions(self):
535
transport = MockTransport([
538
index = _KnitIndex(transport, "filename", "r")
541
("a", ["option"], 0, 1, ["b"]),
542
("a", ["opt"], 1, 2, ["c"]),
543
("b", ["option"], 2, 3, ["a"])
545
self.assertEqual(("append_bytes", ("filename",
546
"\na option 0 1 .b :"
549
), {}), transport.calls.pop(0))
550
self.assertTrue(index.has_version("a"))
551
self.assertTrue(index.has_version("b"))
552
self.assertEqual(2, index.num_versions())
553
self.assertEqual((1, 2), index.get_position("a"))
554
self.assertEqual((2, 3), index.get_position("b"))
555
self.assertEqual(["opt"], index.get_options("a"))
556
self.assertEqual(["option"], index.get_options("b"))
557
self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
558
self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
560
def test_delay_create_and_add_versions(self):
561
transport = MockTransport()
563
index = _KnitIndex(transport, "filename", "w",
564
create=True, file_mode="wb", create_parent_dir=True,
565
delay_create=True, dir_mode=0777)
566
self.assertEqual([], transport.calls)
569
("a", ["option"], 0, 1, ["b"]),
570
("a", ["opt"], 1, 2, ["c"]),
571
("b", ["option"], 2, 3, ["a"])
573
name, (filename, f), kwargs = transport.calls.pop(0)
574
self.assertEqual("put_file_non_atomic", name)
576
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
578
self.assertEqual("filename", filename)
581
"\na option 0 1 .b :"
583
"\nb option 2 3 0 :",
586
def test_has_version(self):
587
transport = MockTransport([
591
index = _KnitIndex(transport, "filename", "r")
593
self.assertTrue(index.has_version("a"))
594
self.assertFalse(index.has_version("b"))
596
def test_get_position(self):
597
transport = MockTransport([
602
index = _KnitIndex(transport, "filename", "r")
604
self.assertEqual((0, 1), index.get_position("a"))
605
self.assertEqual((1, 2), index.get_position("b"))
607
def test_get_method(self):
608
transport = MockTransport([
610
"a fulltext,unknown 0 1 :",
611
"b unknown,line-delta 1 2 :",
614
index = _KnitIndex(transport, "filename", "r")
616
self.assertEqual("fulltext", index.get_method("a"))
617
self.assertEqual("line-delta", index.get_method("b"))
618
self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, "c")
620
def test_get_options(self):
621
transport = MockTransport([
626
index = _KnitIndex(transport, "filename", "r")
628
self.assertEqual(["opt1"], index.get_options("a"))
629
self.assertEqual(["opt2", "opt3"], index.get_options("b"))
631
def test_get_parents(self):
632
transport = MockTransport([
635
"b option 1 2 0 .c :",
636
"c option 1 2 1 0 .e :"
638
index = _KnitIndex(transport, "filename", "r")
640
self.assertEqual([], index.get_parents("a"))
641
self.assertEqual(["a", "c"], index.get_parents("b"))
642
self.assertEqual(["b", "a"], index.get_parents("c"))
644
def test_get_parents_with_ghosts(self):
645
transport = MockTransport([
648
"b option 1 2 0 .c :",
649
"c option 1 2 1 0 .e :"
651
index = _KnitIndex(transport, "filename", "r")
653
self.assertEqual([], index.get_parents_with_ghosts("a"))
654
self.assertEqual(["a", "c"], index.get_parents_with_ghosts("b"))
655
self.assertEqual(["b", "a", "e"],
656
index.get_parents_with_ghosts("c"))
658
def test_check_versions_present(self):
659
transport = MockTransport([
664
index = _KnitIndex(transport, "filename", "r")
666
check = index.check_versions_present
672
self.assertRaises(RevisionNotPresent, check, ["c"])
673
self.assertRaises(RevisionNotPresent, check, ["a", "b", "c"])
676
class KnitTests(TestCaseWithTransport):
677
"""Class containing knit test helper routines."""
679
def make_test_knit(self, annotate=False, delay_create=False):
681
factory = KnitPlainFactory()
684
return KnitVersionedFile('test', get_transport('.'), access_mode='w',
685
factory=factory, create=True,
686
delay_create=delay_create)
689
class BasicKnitTests(KnitTests):
691
def add_stock_one_and_one_a(self, k):
692
k.add_lines('text-1', [], split_lines(TEXT_1))
693
k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
695
def test_knit_constructor(self):
696
"""Construct empty k"""
697
self.make_test_knit()
699
def test_knit_add(self):
700
"""Store one text in knit and retrieve"""
701
k = self.make_test_knit()
702
k.add_lines('text-1', [], split_lines(TEXT_1))
703
self.assertTrue(k.has_version('text-1'))
704
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
706
def test_knit_reload(self):
707
# test that the content in a reloaded knit is correct
708
k = self.make_test_knit()
709
k.add_lines('text-1', [], split_lines(TEXT_1))
711
k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
712
self.assertTrue(k2.has_version('text-1'))
713
self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
715
def test_knit_several(self):
716
"""Store several texts in a knit"""
717
k = self.make_test_knit()
718
k.add_lines('text-1', [], split_lines(TEXT_1))
719
k.add_lines('text-2', [], split_lines(TEXT_2))
720
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
721
self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
723
def test_repeated_add(self):
724
"""Knit traps attempt to replace existing version"""
725
k = self.make_test_knit()
726
k.add_lines('text-1', [], split_lines(TEXT_1))
727
self.assertRaises(RevisionAlreadyPresent,
729
'text-1', [], split_lines(TEXT_1))
731
def test_empty(self):
732
k = self.make_test_knit(True)
733
k.add_lines('text-1', [], [])
734
self.assertEquals(k.get_lines('text-1'), [])
736
def test_incomplete(self):
737
"""Test if texts without a ending line-end can be inserted and
739
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
740
k.add_lines('text-1', [], ['a\n', 'b' ])
741
k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
742
# reopening ensures maximum room for confusion
743
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
744
self.assertEquals(k.get_lines('text-1'), ['a\n', 'b' ])
745
self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
747
def test_delta(self):
748
"""Expression of knit delta as lines"""
749
k = self.make_test_knit()
751
td = list(line_delta(TEXT_1.splitlines(True),
752
TEXT_1A.splitlines(True)))
753
self.assertEqualDiff(''.join(td), delta_1_1a)
754
out = apply_line_delta(TEXT_1.splitlines(True), td)
755
self.assertEqualDiff(''.join(out), TEXT_1A)
757
def assertDerivedBlocksEqual(self, source, target, noeol=False):
758
"""Assert that the derived matching blocks match real output"""
759
source_lines = source.splitlines(True)
760
target_lines = target.splitlines(True)
762
if noeol and not line.endswith('\n'):
766
source_content = KnitContent([(None, nl(l)) for l in source_lines])
767
target_content = KnitContent([(None, nl(l)) for l in target_lines])
768
line_delta = source_content.line_delta(target_content)
769
delta_blocks = list(KnitContent.get_line_delta_blocks(line_delta,
770
source_lines, target_lines))
771
matcher = KnitSequenceMatcher(None, source_lines, target_lines)
772
matcher_blocks = list(list(matcher.get_matching_blocks()))
773
self.assertEqual(matcher_blocks, delta_blocks)
775
def test_get_line_delta_blocks(self):
776
self.assertDerivedBlocksEqual('a\nb\nc\n', 'q\nc\n')
777
self.assertDerivedBlocksEqual(TEXT_1, TEXT_1)
778
self.assertDerivedBlocksEqual(TEXT_1, TEXT_1A)
779
self.assertDerivedBlocksEqual(TEXT_1, TEXT_1B)
780
self.assertDerivedBlocksEqual(TEXT_1B, TEXT_1A)
781
self.assertDerivedBlocksEqual(TEXT_1A, TEXT_1B)
782
self.assertDerivedBlocksEqual(TEXT_1A, '')
783
self.assertDerivedBlocksEqual('', TEXT_1A)
784
self.assertDerivedBlocksEqual('', '')
785
self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd')
787
def test_get_line_delta_blocks_noeol(self):
788
"""Handle historical knit deltas safely
790
Some existing knit deltas don't consider the last line to differ
791
when the only difference whether it has a final newline.
793
New knit deltas appear to always consider the last line to differ
796
self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd\n', noeol=True)
797
self.assertDerivedBlocksEqual('a\nb\nc\nd\n', 'a\nb\nc', noeol=True)
798
self.assertDerivedBlocksEqual('a\nb\nc\n', 'a\nb\nc', noeol=True)
799
self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\n', noeol=True)
801
def test_add_with_parents(self):
802
"""Store in knit with parents"""
803
k = self.make_test_knit()
804
self.add_stock_one_and_one_a(k)
805
self.assertEquals(k.get_parents('text-1'), [])
806
self.assertEquals(k.get_parents('text-1a'), ['text-1'])
808
def test_ancestry(self):
809
"""Store in knit with parents"""
810
k = self.make_test_knit()
811
self.add_stock_one_and_one_a(k)
812
self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
814
def test_add_delta(self):
815
"""Store in knit with parents"""
816
k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
817
delta=True, create=True)
818
self.add_stock_one_and_one_a(k)
820
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
822
def test_annotate(self):
824
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
825
delta=True, create=True)
826
self.insert_and_test_small_annotate(k)
828
def insert_and_test_small_annotate(self, k):
829
"""test annotation with k works correctly."""
830
k.add_lines('text-1', [], ['a\n', 'b\n'])
831
k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
833
origins = k.annotate('text-2')
834
self.assertEquals(origins[0], ('text-1', 'a\n'))
835
self.assertEquals(origins[1], ('text-2', 'c\n'))
837
def test_annotate_fulltext(self):
839
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
840
delta=False, create=True)
841
self.insert_and_test_small_annotate(k)
843
def test_annotate_merge_1(self):
844
k = self.make_test_knit(True)
845
k.add_lines('text-a1', [], ['a\n', 'b\n'])
846
k.add_lines('text-a2', [], ['d\n', 'c\n'])
847
k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
848
origins = k.annotate('text-am')
849
self.assertEquals(origins[0], ('text-a2', 'd\n'))
850
self.assertEquals(origins[1], ('text-a1', 'b\n'))
852
def test_annotate_merge_2(self):
853
k = self.make_test_knit(True)
854
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
855
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
856
k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
857
origins = k.annotate('text-am')
858
self.assertEquals(origins[0], ('text-a1', 'a\n'))
859
self.assertEquals(origins[1], ('text-a2', 'y\n'))
860
self.assertEquals(origins[2], ('text-a1', 'c\n'))
862
def test_annotate_merge_9(self):
863
k = self.make_test_knit(True)
864
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
865
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
866
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
867
origins = k.annotate('text-am')
868
self.assertEquals(origins[0], ('text-am', 'k\n'))
869
self.assertEquals(origins[1], ('text-a2', 'y\n'))
870
self.assertEquals(origins[2], ('text-a1', 'c\n'))
872
def test_annotate_merge_3(self):
873
k = self.make_test_knit(True)
874
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
875
k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
876
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
877
origins = k.annotate('text-am')
878
self.assertEquals(origins[0], ('text-am', 'k\n'))
879
self.assertEquals(origins[1], ('text-a2', 'y\n'))
880
self.assertEquals(origins[2], ('text-a2', 'z\n'))
882
def test_annotate_merge_4(self):
883
k = self.make_test_knit(True)
884
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
885
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
886
k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
887
k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
888
origins = k.annotate('text-am')
889
self.assertEquals(origins[0], ('text-a1', 'a\n'))
890
self.assertEquals(origins[1], ('text-a1', 'b\n'))
891
self.assertEquals(origins[2], ('text-a2', 'z\n'))
893
def test_annotate_merge_5(self):
894
k = self.make_test_knit(True)
895
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
896
k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
897
k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
898
k.add_lines('text-am',
899
['text-a1', 'text-a2', 'text-a3'],
900
['a\n', 'e\n', 'z\n'])
901
origins = k.annotate('text-am')
902
self.assertEquals(origins[0], ('text-a1', 'a\n'))
903
self.assertEquals(origins[1], ('text-a2', 'e\n'))
904
self.assertEquals(origins[2], ('text-a3', 'z\n'))
906
def test_annotate_file_cherry_pick(self):
907
k = self.make_test_knit(True)
908
k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
909
k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
910
k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
911
origins = k.annotate('text-3')
912
self.assertEquals(origins[0], ('text-1', 'a\n'))
913
self.assertEquals(origins[1], ('text-1', 'b\n'))
914
self.assertEquals(origins[2], ('text-1', 'c\n'))
916
def test_knit_join(self):
917
"""Store in knit with parents"""
918
k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
919
k1.add_lines('text-a', [], split_lines(TEXT_1))
920
k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
922
k1.add_lines('text-c', [], split_lines(TEXT_1))
923
k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
925
k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
927
k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
928
count = k2.join(k1, version_ids=['text-m'])
929
self.assertEquals(count, 5)
930
self.assertTrue(k2.has_version('text-a'))
931
self.assertTrue(k2.has_version('text-c'))
933
def test_reannotate(self):
934
k1 = KnitVersionedFile('knit1', get_transport('.'),
935
factory=KnitAnnotateFactory(), create=True)
937
k1.add_lines('text-a', [], ['a\n', 'b\n'])
939
k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
941
k2 = KnitVersionedFile('test2', get_transport('.'),
942
factory=KnitAnnotateFactory(), create=True)
943
k2.join(k1, version_ids=['text-b'])
946
k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
948
k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
950
k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
952
# test-c will have index 3
953
k1.join(k2, version_ids=['text-c'])
955
lines = k1.get_lines('text-c')
956
self.assertEquals(lines, ['z\n', 'c\n'])
958
origins = k1.annotate('text-c')
959
self.assertEquals(origins[0], ('text-c', 'z\n'))
960
self.assertEquals(origins[1], ('text-b', 'c\n'))
962
def test_get_line_delta_texts(self):
963
"""Make sure we can call get_texts on text with reused line deltas"""
964
k1 = KnitVersionedFile('test1', get_transport('.'),
965
factory=KnitPlainFactory(), create=True)
970
parents = ['%d' % (t-1)]
971
k1.add_lines('%d' % t, parents, ['hello\n'] * t)
972
k1.get_texts(('%d' % t) for t in range(3))
974
def test_iter_lines_reads_in_order(self):
975
t = MemoryTransport()
976
instrumented_t = TransportLogger(t)
977
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
978
self.assertEqual([('id.kndx',)], instrumented_t._calls)
979
# add texts with no required ordering
980
k1.add_lines('base', [], ['text\n'])
981
k1.add_lines('base2', [], ['text2\n'])
983
instrumented_t._calls = []
984
# request a last-first iteration
985
results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
986
self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
987
self.assertEqual(['text\n', 'text2\n'], results)
989
def test_create_empty_annotated(self):
990
k1 = self.make_test_knit(True)
992
k1.add_lines('text-a', [], ['a\n', 'b\n'])
993
k2 = k1.create_empty('t', MemoryTransport())
994
self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
995
self.assertEqual(k1.delta, k2.delta)
996
# the generic test checks for empty content and file class
998
def test_knit_format(self):
999
# this tests that a new knit index file has the expected content
1000
# and that is writes the data we expect as records are added.
1001
knit = self.make_test_knit(True)
1002
# Now knit files are not created until we first add data to them
1003
self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
1004
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
1005
self.assertFileEqual(
1006
"# bzr knit index 8\n"
1008
"revid fulltext 0 84 .a_ghost :",
1010
knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
1011
self.assertFileEqual(
1012
"# bzr knit index 8\n"
1013
"\nrevid fulltext 0 84 .a_ghost :"
1014
"\nrevid2 line-delta 84 82 0 :",
1016
# we should be able to load this file again
1017
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
1018
self.assertEqual(['revid', 'revid2'], knit.versions())
1019
# write a short write to the file and ensure that its ignored
1020
indexfile = file('test.kndx', 'at')
1021
indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
1023
# we should be able to load this file again
1024
knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
1025
self.assertEqual(['revid', 'revid2'], knit.versions())
1026
# and add a revision with the same id the failed write had
1027
knit.add_lines('revid3', ['revid2'], ['a\n'])
1028
# and when reading it revid3 should now appear.
1029
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
1030
self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
1031
self.assertEqual(['revid2'], knit.get_parents('revid3'))
1033
def test_delay_create(self):
1034
"""Test that passing delay_create=True creates files late"""
1035
knit = self.make_test_knit(annotate=True, delay_create=True)
1036
self.failIfExists('test.knit')
1037
self.failIfExists('test.kndx')
1038
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
1039
self.failUnlessExists('test.knit')
1040
self.assertFileEqual(
1041
"# bzr knit index 8\n"
1043
"revid fulltext 0 84 .a_ghost :",
1046
def test_create_parent_dir(self):
1047
"""create_parent_dir can create knits in nonexistant dirs"""
1048
# Has no effect if we don't set 'delay_create'
1049
trans = get_transport('.')
1050
self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
1051
trans, access_mode='w', factory=None,
1052
create=True, create_parent_dir=True)
1053
# Nothing should have changed yet
1054
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1055
factory=None, create=True,
1056
create_parent_dir=True,
1058
self.failIfExists('dir/test.knit')
1059
self.failIfExists('dir/test.kndx')
1060
self.failIfExists('dir')
1061
knit.add_lines('revid', [], ['a\n'])
1062
self.failUnlessExists('dir')
1063
self.failUnlessExists('dir/test.knit')
1064
self.assertFileEqual(
1065
"# bzr knit index 8\n"
1067
"revid fulltext 0 84 :",
1070
def test_create_mode_700(self):
1071
trans = get_transport('.')
1072
if not trans._can_roundtrip_unix_modebits():
1073
# Can't roundtrip, so no need to run this test
1075
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1076
factory=None, create=True,
1077
create_parent_dir=True,
1081
knit.add_lines('revid', [], ['a\n'])
1082
self.assertTransportMode(trans, 'dir', 0700)
1083
self.assertTransportMode(trans, 'dir/test.knit', 0600)
1084
self.assertTransportMode(trans, 'dir/test.kndx', 0600)
1086
def test_create_mode_770(self):
1087
trans = get_transport('.')
1088
if not trans._can_roundtrip_unix_modebits():
1089
# Can't roundtrip, so no need to run this test
1091
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1092
factory=None, create=True,
1093
create_parent_dir=True,
1097
knit.add_lines('revid', [], ['a\n'])
1098
self.assertTransportMode(trans, 'dir', 0770)
1099
self.assertTransportMode(trans, 'dir/test.knit', 0660)
1100
self.assertTransportMode(trans, 'dir/test.kndx', 0660)
1102
def test_create_mode_777(self):
1103
trans = get_transport('.')
1104
if not trans._can_roundtrip_unix_modebits():
1105
# Can't roundtrip, so no need to run this test
1107
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1108
factory=None, create=True,
1109
create_parent_dir=True,
1113
knit.add_lines('revid', [], ['a\n'])
1114
self.assertTransportMode(trans, 'dir', 0777)
1115
self.assertTransportMode(trans, 'dir/test.knit', 0666)
1116
self.assertTransportMode(trans, 'dir/test.kndx', 0666)
1118
def test_plan_merge(self):
1119
my_knit = self.make_test_knit(annotate=True)
1120
my_knit.add_lines('text1', [], split_lines(TEXT_1))
1121
my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
1122
my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
1123
plan = list(my_knit.plan_merge('text1a', 'text1b'))
1124
for plan_line, expected_line in zip(plan, AB_MERGE):
1125
self.assertEqual(plan_line, expected_line)
1137
Banana cup cake recipe
1143
- self-raising flour
1147
Banana cup cake recipe
1149
- bananas (do not use plantains!!!)
1156
Banana cup cake recipe
1159
- self-raising flour
1172
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
1177
new-b|- bananas (do not use plantains!!!)
1178
unchanged|- broken tea cups
1179
new-a|- self-raising flour
1182
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
1185
def line_delta(from_lines, to_lines):
1186
"""Generate line-based delta from one text to another"""
1187
s = difflib.SequenceMatcher(None, from_lines, to_lines)
1188
for op in s.get_opcodes():
1189
if op[0] == 'equal':
1191
yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
1192
for i in range(op[3], op[4]):
1196
def apply_line_delta(basis_lines, delta_lines):
1197
"""Apply a line-based perfect diff
1199
basis_lines -- text to apply the patch to
1200
delta_lines -- diff instructions and content
1202
out = basis_lines[:]
1205
while i < len(delta_lines):
1207
a, b, c = map(long, l.split(','))
1209
out[offset+a:offset+b] = delta_lines[i:i+c]
1211
offset = offset + (b - a) + c
1215
class TestWeaveToKnit(KnitTests):
1217
def test_weave_to_knit_matches(self):
1218
# check that the WeaveToKnit is_compatible function
1219
# registers True for a Weave to a Knit.
1221
k = self.make_test_knit()
1222
self.failUnless(WeaveToKnit.is_compatible(w, k))
1223
self.failIf(WeaveToKnit.is_compatible(k, w))
1224
self.failIf(WeaveToKnit.is_compatible(w, w))
1225
self.failIf(WeaveToKnit.is_compatible(k, k))
1228
class TestKnitCaching(KnitTests):
1230
def create_knit(self, cache_add=False):
1231
k = self.make_test_knit(True)
1235
k.add_lines('text-1', [], split_lines(TEXT_1))
1236
k.add_lines('text-2', [], split_lines(TEXT_2))
1239
def test_no_caching(self):
1240
k = self.create_knit()
1241
# Nothing should be cached without setting 'enable_cache'
1242
self.assertEqual({}, k._data._cache)
1244
def test_cache_add_and_clear(self):
1245
k = self.create_knit(True)
1247
self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
1250
self.assertEqual({}, k._data._cache)
1252
def test_cache_data_read_raw(self):
1253
k = self.create_knit()
1255
# Now cache and read
1258
def read_one_raw(version):
1259
pos_map = k._get_components_positions([version])
1260
method, pos, size, next = pos_map[version]
1261
lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
1262
self.assertEqual(1, len(lst))
1265
val = read_one_raw('text-1')
1266
self.assertEqual({'text-1':val[1]}, k._data._cache)
1269
# After clear, new reads are not cached
1270
self.assertEqual({}, k._data._cache)
1272
val2 = read_one_raw('text-1')
1273
self.assertEqual(val, val2)
1274
self.assertEqual({}, k._data._cache)
1276
def test_cache_data_read(self):
1277
k = self.create_knit()
1279
def read_one(version):
1280
pos_map = k._get_components_positions([version])
1281
method, pos, size, next = pos_map[version]
1282
lst = list(k._data.read_records_iter([(version, pos, size)]))
1283
self.assertEqual(1, len(lst))
1286
# Now cache and read
1289
val = read_one('text-2')
1290
self.assertEqual(['text-2'], k._data._cache.keys())
1291
self.assertEqual('text-2', val[0])
1292
content, digest = k._data._parse_record('text-2',
1293
k._data._cache['text-2'])
1294
self.assertEqual(content, val[1])
1295
self.assertEqual(digest, val[2])
1298
self.assertEqual({}, k._data._cache)
1300
val2 = read_one('text-2')
1301
self.assertEqual(val, val2)
1302
self.assertEqual({}, k._data._cache)
1304
def test_cache_read(self):
1305
k = self.create_knit()
1308
text = k.get_text('text-1')
1309
self.assertEqual(TEXT_1, text)
1310
self.assertEqual(['text-1'], k._data._cache.keys())
1313
self.assertEqual({}, k._data._cache)
1315
text = k.get_text('text-1')
1316
self.assertEqual(TEXT_1, text)
1317
self.assertEqual({}, k._data._cache)
1320
class TestKnitIndex(KnitTests):
1322
def test_add_versions_dictionary_compresses(self):
1323
"""Adding versions to the index should update the lookup dict"""
1324
knit = self.make_test_knit()
1326
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1327
self.check_file_contents('test.kndx',
1328
'# bzr knit index 8\n'
1330
'a-1 fulltext 0 0 :'
1332
idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
1333
('a-3', ['fulltext'], 0, 0, ['a-2']),
1335
self.check_file_contents('test.kndx',
1336
'# bzr knit index 8\n'
1338
'a-1 fulltext 0 0 :\n'
1339
'a-2 fulltext 0 0 0 :\n'
1340
'a-3 fulltext 0 0 1 :'
1342
self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
1343
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
1344
'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
1345
'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
1348
def test_add_versions_fails_clean(self):
1349
"""If add_versions fails in the middle, it restores a pristine state.
1351
Any modifications that are made to the index are reset if all versions
1354
# This cheats a little bit by passing in a generator which will
1355
# raise an exception before the processing finishes
1356
# Other possibilities would be to have an version with the wrong number
1357
# of entries, or to make the backing transport unable to write any
1360
knit = self.make_test_knit()
1362
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1364
class StopEarly(Exception):
1367
def generate_failure():
1368
"""Add some entries and then raise an exception"""
1369
yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
1370
yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
1373
# Assert the pre-condition
1374
self.assertEqual(['a-1'], idx._history)
1375
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1377
self.assertRaises(StopEarly, idx.add_versions, generate_failure())
1379
# And it shouldn't be modified
1380
self.assertEqual(['a-1'], idx._history)
1381
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1383
def test_knit_index_ignores_empty_files(self):
1384
# There was a race condition in older bzr, where a ^C at the right time
1385
# could leave an empty .kndx file, which bzr would later claim was a
1386
# corrupted file since the header was not present. In reality, the file
1387
# just wasn't created, so it should be ignored.
1388
t = get_transport('.')
1389
t.put_bytes('test.kndx', '')
1391
knit = self.make_test_knit()
1393
def test_knit_index_checks_header(self):
1394
t = get_transport('.')
1395
t.put_bytes('test.kndx', '# not really a knit header\n\n')
1397
self.assertRaises(KnitHeaderError, self.make_test_knit)